[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16497242#comment-16497242 ] ASF GitHub Bot commented on DRILL-6032: --- ilooner closed pull request #1101: DRILL-6032: Made the batch sizing for HashAgg more accurate. URL: https://github.com/apache/drill/pull/1101 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java index 47f1017b34..374dcdd3a4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java @@ -25,7 +25,6 @@ import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; -import org.apache.drill.common.expression.FunctionHolderExpression; import org.apache.drill.common.expression.IfExpression; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.logical.data.NamedExpression; @@ -65,7 +64,7 @@ private HashAggregator aggregator; private RecordBatch incoming; - private LogicalExpression[] aggrExprs; + private ValueVectorWriteExpression[] aggrExprs; private TypedFieldId[] groupByOutFieldIds; private TypedFieldId[] aggrOutFieldIds; // field ids for the outgoing batch private final List comparators; @@ -231,7 +230,7 @@ private HashAggregator createAggregatorInternal() throws SchemaChangeException, int numGroupByExprs = (popConfig.getGroupByExprs() != null) ? popConfig.getGroupByExprs().size() : 0; int numAggrExprs = (popConfig.getAggrExprs() != null) ? popConfig.getAggrExprs().size() : 0; -aggrExprs = new LogicalExpression[numAggrExprs]; +aggrExprs = new ValueVectorWriteExpression[numAggrExprs]; groupByOutFieldIds = new TypedFieldId[numGroupByExprs]; aggrOutFieldIds = new TypedFieldId[numAggrExprs]; @@ -255,7 +254,6 @@ private HashAggregator createAggregatorInternal() throws SchemaChangeException, groupByOutFieldIds[i] = container.add(vv); } -int extraNonNullColumns = 0; // each of SUM, MAX and MIN gets an extra bigint column for (i = 0; i < numAggrExprs; i++) { NamedExpression ne = popConfig.getAggrExprs().get(i); final LogicalExpression expr = @@ -273,15 +271,10 @@ private HashAggregator createAggregatorInternal() throws SchemaChangeException, continue; } - if ( expr instanceof FunctionHolderExpression ) { - String funcName = ((FunctionHolderExpression) expr).getName(); - if ( funcName.equals("sum") || funcName.equals("max") || funcName.equals("min") ) {extraNonNullColumns++;} - } final MaterializedField outputField = MaterializedField.create(ne.getRef().getAsNamePart().getName(), expr.getMajorType()); @SuppressWarnings("resource") ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator()); aggrOutFieldIds[i] = container.add(vv); - aggrExprs[i] = new ValueVectorWriteExpression(aggrOutFieldIds[i], expr, true); } @@ -301,7 +294,7 @@ private HashAggregator createAggregatorInternal() throws SchemaChangeException, aggrExprs, cgInner.getWorkspaceTypes(), groupByOutFieldIds, -this.container, extraNonNullColumns * 8 /* sizeof(BigInt) */); +container); return agg; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index 4c54080169..1a33577120 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -17,20 +17,16 @@ */ package org.apache.drill.exec.physical.impl.aggregate; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import javax.inject.Named; - +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; import org.apache.drill.common.exceptions.RetryAfterSpillException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.FieldReference; +import
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16497241#comment-16497241 ] ASF GitHub Bot commented on DRILL-6032: --- ilooner commented on issue #1101: DRILL-6032: Made the batch sizing for HashAgg more accurate. URL: https://github.com/apache/drill/pull/1101#issuecomment-393694572 While making this change I learned that it is extremely difficult to make changes to the HashAgg operator, specifically the memory calculations. After a month of work I was not able to bring even a simple change like this to a state where it would not OOM. Others experience similar frustrations when they make changes that either impact the HashAgg operator directly, or indirectly. Considering the fact that a comprehensive and stable memory calculator was created for HashJoin in less than a month, I feel that there is an easier way forward. As a result of this experience I was motivated to create https://issues.apache.org/jira/browse/DRILL-6253 which focuses on refactoring and unit testing the memory calculations for HashAgg . DRILL-6253 will fulfill the original intention of this PR, which was to make the memory calculations more accurate, and also improve the general stability and testability of HashAgg. As a result of this, I am going to abandon this PR and all future work will be done on the PR for DRILL-6253 once it is opened. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.15.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16482837#comment-16482837 ] ASF GitHub Bot commented on DRILL-6032: --- kkhatua commented on issue #1101: DRILL-6032: Made the batch sizing for HashAgg more accurate. URL: https://github.com/apache/drill/pull/1101#issuecomment-390737369 @ilooner do you has the fixes for the failure reported by @cchang738 ? It also looks like the PR needs rebasing. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.14.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16381044#comment-16381044 ] ASF GitHub Bot commented on DRILL-6032: --- Github user cchang738 commented on the issue: https://github.com/apache/drill/pull/1101 My test fail with OOM. @ilooner has test log. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380835#comment-16380835 ] ASF GitHub Bot commented on DRILL-6032: --- Github user priteshm commented on the issue: https://github.com/apache/drill/pull/1101 Spoke with Chun, he will run the tests and update the PR with the test results. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379721#comment-16379721 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ilooner commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r171133616 --- Diff: exec/java-exec/src/main/resources/drill-module.conf --- @@ -427,8 +427,8 @@ drill.exec.options: { exec.enable_union_type: false, exec.errors.verbose: false, exec.hashagg.mem_limit: 0, -exec.hashagg.min_batches_per_partition: 2, -exec.hashagg.num_partitions: 32, +exec.hashagg.min_batches_per_partition: 1, --- End diff -- @Ben-Zvi This setting controls the minimum number of batches kept in memory per partition. Making this larger will cause us to consume more memory. Making it smaller makes us consume less memory. Also in general the purpose of this PR was to make the memory calculations more precise and deterministic and it passes all regression tests. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379694#comment-16379694 ] ASF GitHub Bot commented on DRILL-6032: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r171130622 --- Diff: exec/java-exec/src/main/resources/drill-module.conf --- @@ -427,8 +427,8 @@ drill.exec.options: { exec.enable_union_type: false, exec.errors.verbose: false, exec.hashagg.mem_limit: 0, -exec.hashagg.min_batches_per_partition: 2, -exec.hashagg.num_partitions: 32, +exec.hashagg.min_batches_per_partition: 1, --- End diff -- This option was meant to create a "slack". **1** is the lowest value - requiring only 1 batch per each partition, i.e., no slack; so that requires the memory computations to be more precise now !! > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16373319#comment-16373319 ] ASF GitHub Bot commented on DRILL-6032: --- Github user priteshm commented on the issue: https://github.com/apache/drill/pull/1101 @Ben-Zvi can you please do a final review? > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366264#comment-16366264 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ilooner commented on the issue: https://github.com/apache/drill/pull/1101 @Ben-Zvi I applied your review comment, please let me know if you have any more comments. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366262#comment-16366262 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ilooner commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r168609439 --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java --- @@ -93,7 +93,7 @@ private void testSpill(long maxMem, long numPartitions, long minBatches, int max @Test public void testSimpleHashAggrSpill() throws Exception { testSpill(68_000_000, 16, 2, 2, false, - true, null, 1_200_000, 1, 1, 1, 3); + true, null, 1_200_000, 1, 2, 1, 3); --- End diff -- Fixed. I increased the memory slightly. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365042#comment-16365042 ] ASF GitHub Bot commented on DRILL-6032: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r168363813 --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java --- @@ -93,7 +93,7 @@ private void testSpill(long maxMem, long numPartitions, long minBatches, int max @Test public void testSimpleHashAggrSpill() throws Exception { testSpill(68_000_000, 16, 2, 2, false, - true, null, 1_200_000, 1, 1, 1, 3); + true, null, 1_200_000, 1, 2, 1, 3); --- End diff -- This is supposed to be a "simple spill" (see comment above); not a "recursive" one (with cycle > 1). Maybe just reducing the number of rows would do the trick > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357630#comment-16357630 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ilooner commented on the issue: https://github.com/apache/drill/pull/1101 @Ben-Zvi I have addressed review comments. All unit tests and functional tests (except for typical random failures are passing). I have included a commit for DRILL-6144 in this PR in order to fix other issues that were breaking our builds and validate that these changes pass all tests. After these changes are +1 I will remove the commit for DRILL-6144. I will open a separate PR to merge the changes in DRILL-6144. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354753#comment-16354753 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ilooner commented on the issue: https://github.com/apache/drill/pull/1101 Travis failure is unrelated > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354637#comment-16354637 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ilooner commented on the issue: https://github.com/apache/drill/pull/1101 @ppadma Responded to your comments. Please look at the last three commits for changes > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354394#comment-16354394 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ilooner commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r166411194 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -733,28 +780,32 @@ private void restoreReservedMemory() { * @param records */ private void allocateOutgoing(int records) { -// Skip the keys and only allocate for outputting the workspace values -// (keys will be output through splitAndTransfer) -IteratoroutgoingIter = outContainer.iterator(); -for (int i = 0; i < numGroupByOutFields; i++) { - outgoingIter.next(); -} - // try to preempt an OOM by using the reserved memory useReservedOutgoingMemory(); long allocatedBefore = allocator.getAllocatedMemory(); -while (outgoingIter.hasNext()) { +for (int columnIndex = numGroupByOutFields; columnIndex < outContainer.getNumberOfColumns(); columnIndex++) { + final VectorWrapper wrapper = outContainer.getValueVector(columnIndex); @SuppressWarnings("resource") - ValueVector vv = outgoingIter.next().getValueVector(); + final ValueVector vv = wrapper.getValueVector(); - AllocationHelper.allocatePrecomputedChildCount(vv, records, maxColumnWidth, 0); + final RecordBatchSizer.ColumnSize columnSizer = new RecordBatchSizer.ColumnSize(wrapper.getValueVector()); + int columnSize; + + if (columnSizer.hasKnownSize()) { +// For fixed width vectors we know the size of each record +columnSize = columnSizer.getKnownSize(); + } else { +// For var chars we need to use the input estimate +columnSize = varcharValueSizes.get(columnIndex); + } + + AllocationHelper.allocatePrecomputedChildCount(vv, records, columnSize, 0); --- End diff -- Hmm. The element count from the sizer would tell us the number of rows in the incoming batch. But that will not give us an accurate prediction for the number of keys we have aggregations for. Maybe we should use the number of keys in the hashtable instead. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354378#comment-16354378 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ilooner commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r166409878 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -226,7 +221,7 @@ public BatchHolder() { ((FixedWidthVector) vector).allocateNew(HashTable.BATCH_SIZE); } else if (vector instanceof VariableWidthVector) { // This case is never used a varchar falls under ObjectVector which is allocated on the heap ! -((VariableWidthVector) vector).allocateNew(maxColumnWidth, HashTable.BATCH_SIZE); +((VariableWidthVector) vector).allocateNew(columnSize, HashTable.BATCH_SIZE); --- End diff -- Thanks for catching this. It should use stdSize here. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354241#comment-16354241 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ilooner commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r166389076 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java --- @@ -65,6 +70,14 @@ public int stdSize; +/** + * If the we can determine the exact width of the row of a vector upfront, + * the row widths is saved here. If we cannot determine the exact width + * (for example for VarChar or Repeated vectors), then + */ + +private int knownSize = -1; --- End diff -- Thanks for catching this. I'll use stdSize instead. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354239#comment-16354239 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ilooner commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r166388922 --- Diff: exec/vector/src/main/codegen/templates/FixedValueVectors.java --- @@ -298,6 +298,11 @@ public int getPayloadByteCount(int valueCount) { return valueCount * ${type.width}; } + @Override + public int getValueWidth() { --- End diff -- TypeHelper doesn't return the correct size for Nullable columns. For example NullableIntVector has a width of 5 instead of 4. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354225#comment-16354225 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ilooner commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r166387829 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -140,6 +131,9 @@ private OperatorContext oContext; private BufferAllocator allocator; + private MapkeySizes; + // The size estimates for varchar value columns. The keys are the index of the varchar value columns. + private Map varcharValueSizes; --- End diff -- As far as I know we don't support aggregations on repeated types. Varchar is the only non FixedWidth type we can aggregate. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353073#comment-16353073 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r166142178 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java --- @@ -65,6 +70,14 @@ public int stdSize; +/** + * If the we can determine the exact width of the row of a vector upfront, + * the row widths is saved here. If we cannot determine the exact width + * (for example for VarChar or Repeated vectors), then + */ + +private int knownSize = -1; --- End diff -- Like I mentioned in other comment, seems like we can just use stdSize. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353076#comment-16353076 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r166098364 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -140,6 +131,9 @@ private OperatorContext oContext; private BufferAllocator allocator; + private MapkeySizes; + // The size estimates for varchar value columns. The keys are the index of the varchar value columns. + private Map varcharValueSizes; --- End diff -- Don't you need to adjust size estimates for repeated types also ? > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353071#comment-16353071 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r166096630 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -215,6 +206,7 @@ public BatchHolder() { MaterializedField outputField = materializedValueFields[i]; // Create a type-specific ValueVector for this value vector = TypeHelper.getNewVector(outputField, allocator); + int columnSize = new RecordBatchSizer.ColumnSize(vector).estSize; --- End diff -- there is already stdSize which is kind of doing the same thing. can we use that instead of knownSize ? > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353072#comment-16353072 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r166136279 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -226,7 +221,7 @@ public BatchHolder() { ((FixedWidthVector) vector).allocateNew(HashTable.BATCH_SIZE); } else if (vector instanceof VariableWidthVector) { // This case is never used a varchar falls under ObjectVector which is allocated on the heap ! -((VariableWidthVector) vector).allocateNew(maxColumnWidth, HashTable.BATCH_SIZE); +((VariableWidthVector) vector).allocateNew(columnSize, HashTable.BATCH_SIZE); --- End diff -- for a just allocated vector, estSize will return 0. how can we use that for allocation ? > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353074#comment-16353074 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r166141274 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -733,28 +780,32 @@ private void restoreReservedMemory() { * @param records */ private void allocateOutgoing(int records) { -// Skip the keys and only allocate for outputting the workspace values -// (keys will be output through splitAndTransfer) -IteratoroutgoingIter = outContainer.iterator(); -for (int i = 0; i < numGroupByOutFields; i++) { - outgoingIter.next(); -} - // try to preempt an OOM by using the reserved memory useReservedOutgoingMemory(); long allocatedBefore = allocator.getAllocatedMemory(); -while (outgoingIter.hasNext()) { +for (int columnIndex = numGroupByOutFields; columnIndex < outContainer.getNumberOfColumns(); columnIndex++) { + final VectorWrapper wrapper = outContainer.getValueVector(columnIndex); @SuppressWarnings("resource") - ValueVector vv = outgoingIter.next().getValueVector(); + final ValueVector vv = wrapper.getValueVector(); - AllocationHelper.allocatePrecomputedChildCount(vv, records, maxColumnWidth, 0); + final RecordBatchSizer.ColumnSize columnSizer = new RecordBatchSizer.ColumnSize(wrapper.getValueVector()); + int columnSize; + + if (columnSizer.hasKnownSize()) { +// For fixed width vectors we know the size of each record +columnSize = columnSizer.getKnownSize(); + } else { +// For var chars we need to use the input estimate +columnSize = varcharValueSizes.get(columnIndex); + } + + AllocationHelper.allocatePrecomputedChildCount(vv, records, columnSize, 0); --- End diff -- I think we should also get elementCount from sizer and use that instead of passing 0. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353075#comment-16353075 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r166142507 --- Diff: exec/vector/src/main/codegen/templates/FixedValueVectors.java --- @@ -298,6 +298,11 @@ public int getPayloadByteCount(int valueCount) { return valueCount * ${type.width}; } + @Override + public int getValueWidth() { --- End diff -- If we are using stdSize and get the value from TypeHelper, we don't need all value vectors to have this new function. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16351174#comment-16351174 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ilooner commented on the issue: https://github.com/apache/drill/pull/1101 An additional comment is that after the defaults were reduced the number of random failures we see in our functional tests decreased to just 3. Typically there are more than that. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16351169#comment-16351169 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ilooner commented on the issue: https://github.com/apache/drill/pull/1101 @Ben-Zvi I have responded to comments and implemented requested changes. Please see latest commits for changes. I have made some additional changes after noticing that some of the batch sizing calculations were incorrect, I have also made various documentation and naming changes to improve the readability of the code and to document what needs to be improved in the future.: - The size of varchar columns were not properly accounted for in the outgoing batch in the case where varchars are aggregated. I have added logic to the updateEst... method to account for this. - I have added docs to the updateEst method explaining what the various batch sizing estimates do. - I have changed the variable names for the batch size estimates to more accurately reflect what they do. - Previously if a batch size estimate was smaller than the actual amount of memory allocated, the estimate was updated to be the larger size. I think this was actually the wrong behavior since it causes the HashAgg operator's memory estimates to be extremely aggressive and in the worst case can cause the operator to run out of memory prematurely. Ideally a good estimate will over estimate 50% of the time and under estimate 50% of the time. - I have changed the HashAgg defaults. Although tests passed with the previous defaults, I felt they were too aggressive. I have changed the default number of partitions to 16 and the minimum number of batches to 1. Let me know if you have anymore comments. Thanks. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350669#comment-16350669 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ilooner commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r165704469 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -956,21 +925,8 @@ private void spillAPartition(int part) { this.htables[part].outputKeys(currOutBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value, numPendingOutput); // set the value count for outgoing batch value vectors - /* int i = 0; */ for (VectorWrapper v : outgoing) { v.getValueVector().getMutator().setValueCount(numOutputRecords); -/* --- End diff -- Thanks! > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349847#comment-16349847 ] ASF GitHub Bot commented on DRILL-6032: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r165566478 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -516,43 +501,48 @@ private void initializeSetup(RecordBatch newIncoming) throws SchemaChangeExcepti * @param incoming */ private void updateEstMaxBatchSize(RecordBatch incoming) { -if ( estMaxBatchSize > 0 ) { return; } // no handling of a schema (or varchar) change // Use the sizer to get the input row width and the length of the longest varchar column -RecordBatchSizer sizer = new RecordBatchSizer(incoming); -logger.trace("Incoming sizer: {}",sizer); -// An empty batch only has the schema, can not tell actual length of varchars -// else use the actual varchars length, each capped at 50 (to match the space allocation) -long estInputRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : sizer.netRowWidthCap50(); +final RecordBatchSizer incomingColumnSizes = new RecordBatchSizer(incoming); +final MapcolumnSizeMap = incomingColumnSizes.getColumnSizeMap(); +keySizes = CaseInsensitiveMap.newHashMap(); -// Get approx max (varchar) column width to get better memory allocation -maxColumnWidth = Math.max(sizer.maxAvgColumnSize(), VARIABLE_MIN_WIDTH_VALUE_SIZE); -maxColumnWidth = Math.min(maxColumnWidth, VARIABLE_MAX_WIDTH_VALUE_SIZE); +logger.trace("Incoming sizer: {}",incomingColumnSizes); -// // Calculate the estimated max (internal) batch (i.e. Keys batch + Values batch) size // (which is used to decide when to spill) // Also calculate the values batch size (used as a reserve to overcome an OOM) -// -Iterator outgoingIter = outContainer.iterator(); -int fieldId = 0; -while (outgoingIter.hasNext()) { - ValueVector vv = outgoingIter.next().getValueVector(); - MaterializedField mr = vv.getField(); - int fieldSize = vv instanceof VariableWidthVector ? maxColumnWidth : - TypeHelper.getSize(mr.getType()); - estRowWidth += fieldSize; - estOutputRowWidth += fieldSize; - if ( fieldId < numGroupByOutFields ) { fieldId++; } - else { estValuesRowWidth += fieldSize; } + +for (int columnIndex = 0; columnIndex < numGroupByOutFields; columnIndex++) { + final VectorWrapper vectorWrapper = outContainer.getValueVector(columnIndex); + final String columnName = vectorWrapper.getField().getName(); + final int columnSize = columnSizeMap.get(columnName).estSize; + keySizes.put(columnName, columnSize); + estOutputRowWidth += columnSize; } + +long estValuesRowWidth = 0; + +for (int columnIndex = numGroupByOutFields; columnIndex < outContainer.getNumberOfColumns(); columnIndex++) { + VectorWrapper vectorWrapper = outContainer.getValueVector(columnIndex); + RecordBatchSizer.ColumnSize columnSize = new RecordBatchSizer.ColumnSize(vectorWrapper.getValueVector()); + estOutputRowWidth += columnSize.estSize; --- End diff -- The columns in the output batch for the hash agg are usually required numerics (fixed width). But, the can be nullable in some cases (talk to Boaz.) If you are doing a max or min on a string, then the output will be a VarChar, so you have to account for the offset and data vectors. If you want to know how much memory a vector takes after creating it, just measure memory before and after the vector allocation. If you want to predict the size to do an allocation, then for fixed width you just need a row count. For VarChar, you need to predict the width also. You only need the "sizer" when trying to predict a vector size before you create it, and the vector has variable-width or complex parts. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349842#comment-16349842 ] ASF GitHub Bot commented on DRILL-6032: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r165566070 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -956,21 +925,8 @@ private void spillAPartition(int part) { this.htables[part].outputKeys(currOutBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value, numPendingOutput); // set the value count for outgoing batch value vectors - /* int i = 0; */ for (VectorWrapper v : outgoing) { v.getValueVector().getMutator().setValueCount(numOutputRecords); -/* --- End diff -- See the `RowSet` classes. Chunhui and others have successfully used these to print bathes for debugging. Roughly: ``` RowSet rs = DirectRowSet.fromContainer(container); rs.print(); ``` This can be modified to take a row range (to avoid, say, printing 64K rows.) There are versions to do the printing using an SV2 or SV4. In the code I've not yet posted is a utility to print the contents of an offset vector. Very handy when your offsets get messed up. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349601#comment-16349601 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ilooner commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r165532035 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -516,43 +501,48 @@ private void initializeSetup(RecordBatch newIncoming) throws SchemaChangeExcepti * @param incoming */ private void updateEstMaxBatchSize(RecordBatch incoming) { -if ( estMaxBatchSize > 0 ) { return; } // no handling of a schema (or varchar) change // Use the sizer to get the input row width and the length of the longest varchar column -RecordBatchSizer sizer = new RecordBatchSizer(incoming); -logger.trace("Incoming sizer: {}",sizer); -// An empty batch only has the schema, can not tell actual length of varchars -// else use the actual varchars length, each capped at 50 (to match the space allocation) -long estInputRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : sizer.netRowWidthCap50(); +final RecordBatchSizer incomingColumnSizes = new RecordBatchSizer(incoming); +final MapcolumnSizeMap = incomingColumnSizes.getColumnSizeMap(); +keySizes = CaseInsensitiveMap.newHashMap(); -// Get approx max (varchar) column width to get better memory allocation -maxColumnWidth = Math.max(sizer.maxAvgColumnSize(), VARIABLE_MIN_WIDTH_VALUE_SIZE); -maxColumnWidth = Math.min(maxColumnWidth, VARIABLE_MAX_WIDTH_VALUE_SIZE); +logger.trace("Incoming sizer: {}",incomingColumnSizes); -// // Calculate the estimated max (internal) batch (i.e. Keys batch + Values batch) size // (which is used to decide when to spill) // Also calculate the values batch size (used as a reserve to overcome an OOM) -// -Iterator outgoingIter = outContainer.iterator(); -int fieldId = 0; -while (outgoingIter.hasNext()) { - ValueVector vv = outgoingIter.next().getValueVector(); - MaterializedField mr = vv.getField(); - int fieldSize = vv instanceof VariableWidthVector ? maxColumnWidth : - TypeHelper.getSize(mr.getType()); - estRowWidth += fieldSize; - estOutputRowWidth += fieldSize; - if ( fieldId < numGroupByOutFields ) { fieldId++; } - else { estValuesRowWidth += fieldSize; } + +for (int columnIndex = 0; columnIndex < numGroupByOutFields; columnIndex++) { + final VectorWrapper vectorWrapper = outContainer.getValueVector(columnIndex); + final String columnName = vectorWrapper.getField().getName(); + final int columnSize = columnSizeMap.get(columnName).estSize; + keySizes.put(columnName, columnSize); + estOutputRowWidth += columnSize; } + +long estValuesRowWidth = 0; + +for (int columnIndex = numGroupByOutFields; columnIndex < outContainer.getNumberOfColumns(); columnIndex++) { + VectorWrapper vectorWrapper = outContainer.getValueVector(columnIndex); + RecordBatchSizer.ColumnSize columnSize = new RecordBatchSizer.ColumnSize(vectorWrapper.getValueVector()); + estOutputRowWidth += columnSize.estSize; --- End diff -- We have to account for the unknown columns in the value batch estimate, but the columns in the output batch are well defined and do not contain these "extra" columns. So why do we have to augment the estimate for the output batch? Why is looking at the columns in the outputContainer insufficient? > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349467#comment-16349467 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ilooner commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r165516121 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -215,6 +206,7 @@ public BatchHolder() { MaterializedField outputField = materializedValueFields[i]; // Create a type-specific ValueVector for this value vector = TypeHelper.getNewVector(outputField, allocator); + int columnSize = new RecordBatchSizer.ColumnSize(vector).estSize; --- End diff -- Thanks for the info @paul-rogers . Since we want to have the RecordBatchSizer be a one stop shop for size related information I propose to do the following to address @ppadma 's concerns: - Remove changes to estSize in the RecordBatchSizer that I introduced. - Add a field called knownSize to the RecordBatchSizer. This field will contain the known column width for FixedWidthVectors and will be -1 for things like varchars and repeated columns. Then we can add a method getKnownSize() for a column size. This method will return the known size if available and will throw an exception if the column is not FixedWidth. So this method should only be used by the caller when they know they are dealing with FixedWidth columns as is the case with HashAgg for now (things will probably change when strings will be stored in varchars instead of objects). The rest of RecordBatchSizer will remain as is. - Add a constant called FRAGMENTATION_FACTOR (1.33) that people can use to multiply their custom crafted batch sizes by when they are estimating batch sizes from scratch. In HashAgg I would multiply the custom batch size I compute by this factor. Thoughts? > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349438#comment-16349438 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ilooner commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r165513161 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java --- @@ -255,7 +254,6 @@ private HashAggregator createAggregatorInternal() throws SchemaChangeException, groupByOutFieldIds[i] = container.add(vv); } -int extraNonNullColumns = 0; // each of SUM, MAX and MIN gets an extra bigint column --- End diff -- Ok. I am going to move all the code into one place though. Previously half the code was in HashAggBatch and the other half was in HashAggTemplate. I'll also add a comment that this extra code should be removed after DRILL-5728, and I will also add a comment to DRILL-5728 to reminder the implementer to remove this extra code. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348039#comment-16348039 ] ASF GitHub Bot commented on DRILL-6032: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r165263759 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -215,6 +206,7 @@ public BatchHolder() { MaterializedField outputField = materializedValueFields[i]; // Create a type-specific ValueVector for this value vector = TypeHelper.getNewVector(outputField, allocator); + int columnSize = new RecordBatchSizer.ColumnSize(vector).estSize; --- End diff -- I can think of three reasons to use the sizer: * Type logic is complex: we have multiple sets of rules depending on the data type. Best to encapsulate the logic in a single place. So, either 1) use the "sizer", or 2) move the logic from the "sizer" to a common utility. * Column size is tricky as it depends on `DataMode`. The size or a `Required INT` is 4. The (total memory) size of an `Optional INT` is 5. For a `Repeated INT`? You need to know the average array cardinality, which the "sizer" provides (by analyzing an input batch.) * As discussed, variable-width columns (`VARCHAR`, `VARBINARY` for HBase) have no known size. We really have to completely forget about that awful "50" estimate. We can only estimate size from input, which is, again, what the "sizer" does. Of course, all the above only works I you actually sample the input. A current limitation (and good enhancement) is that the Sizer is aware of just one batch. The sort (the first user of the "sizer") needed only aggregate row size, so it just kept track of the widest row ever seen. If you need detailed column information, you may want another layer: one that aggregates information across batches. (For arrays and variable-width columns, you can take the weighted average or the maximum depending on your needs.) Remember, if the purpose of this number is to estimate memory use, then you have to add a 33% (average) allowance for internal fragmentation. (Each vector is, on average, 75% full.) > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347504#comment-16347504 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ilooner commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r165168294 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -84,13 +85,6 @@ public abstract class HashAggTemplate implements HashAggregator { protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregator.class); - private static final int VARIABLE_MAX_WIDTH_VALUE_SIZE = 50; - private static final int VARIABLE_MIN_WIDTH_VALUE_SIZE = 8; - - private static final boolean EXTRA_DEBUG_1 = false; --- End diff -- Oh but there is! slf4j and logback have a feature called markers, which allows you to associate a tag with a statement. When you print logs you can specify to filter by level and by marker. There is a working example here https://examples.javacodegeeks.com/enterprise-java/slf4j/slf4j-markers-example/ . I will update the log statements to use markers in this PR. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347492#comment-16347492 ] ASF GitHub Bot commented on DRILL-6032: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r164617150 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -84,13 +85,6 @@ public abstract class HashAggTemplate implements HashAggregator { protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregator.class); - private static final int VARIABLE_MAX_WIDTH_VALUE_SIZE = 50; - private static final int VARIABLE_MIN_WIDTH_VALUE_SIZE = 8; - - private static final boolean EXTRA_DEBUG_1 = false; --- End diff -- The logging framework only gives error/warning/debug/trace ... there is no option for a user configurable level > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347493#comment-16347493 ] ASF GitHub Bot commented on DRILL-6032: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r165166234 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java --- @@ -255,7 +254,6 @@ private HashAggregator createAggregatorInternal() throws SchemaChangeException, groupByOutFieldIds[i] = container.add(vv); } -int extraNonNullColumns = 0; // each of SUM, MAX and MIN gets an extra bigint column --- End diff -- Maybe do this work as a separate PR (for DRILL-5728) ? Else it would delay this PR, and overload it ... > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347447#comment-16347447 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ilooner commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r165161146 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java --- @@ -255,7 +254,6 @@ private HashAggregator createAggregatorInternal() throws SchemaChangeException, groupByOutFieldIds[i] = container.add(vv); } -int extraNonNullColumns = 0; // each of SUM, MAX and MIN gets an extra bigint column --- End diff -- Thanks for catching this. Then we should fix the underlying problem instead of passing around additional parameters to work around the issue. I will work on fixing the codegen for the BatchHolder as part of this PR. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347420#comment-16347420 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r165156589 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -215,6 +206,7 @@ public BatchHolder() { MaterializedField outputField = materializedValueFields[i]; // Create a type-specific ValueVector for this value vector = TypeHelper.getNewVector(outputField, allocator); + int columnSize = new RecordBatchSizer.ColumnSize(vector).estSize; --- End diff -- @ilooner That is the point. If we know the exact value, why do we need RecordBatchSizer ? we should use RecordBatchSizer when we need to get sizing information for a batch (in most cases, incoming batch). In this case, you are allocating memory for value vectors for the batch you are building. For fixed width columns, you can get the column width size for each type you are allocating memory for using TypeHelper.getSize. For variable width columns, TypeHelper.getSize assumes it is 50 bytes. If you want to adjust memory you are allocating for variable width columns for outgoing batch based on incoming batch, that's when you use RecordBatchSizer on actual incoming batch to figure out the average size of that column. You can also use RecordBatchSizer on incoming batch if you want to figure out how many values you want to allocate memory for in the outgoing batch. Note that, with your change, for just created value vectors, variable width columns will return estSize of 1, which is not what you want. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347292#comment-16347292 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ilooner commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r165137630 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java --- @@ -232,9 +251,8 @@ else if (width > 0) { } } - public static final int MAX_VECTOR_SIZE = ValueVector.MAX_BUFFER_SIZE; // 16 MiB - private List columnSizes = new ArrayList<>(); + private MapcolumnSizeMap = CaseInsensitiveMap.newHashMap(); --- End diff -- Thanks for the explanation here and on the dev list @paul-rogers. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347288#comment-16347288 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ilooner commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r165136635 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -397,11 +384,9 @@ private void delayedSetup() { } numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not a power of 2 -if ( schema == null ) { estValuesBatchSize = estOutgoingAllocSize = estMaxBatchSize = 0; } // incoming was an empty batch --- End diff -- All the unit and functional tests passed without an NPE. The null check was redundant because the code in **doWork** that calls **delayedSetup** sets the schema if it is null. ``` // This would be called only once - first time actual data arrives on incoming if ( schema == null && incoming.getRecordCount() > 0 ) { this.schema = incoming.getSchema(); currentBatchRecordCount = incoming.getRecordCount(); // initialize for first non empty batch // Calculate the number of partitions based on actual incoming data delayedSetup(); } ``` So schema will never be null when delayed setup is called > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347279#comment-16347279 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ilooner commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r165135291 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -215,6 +206,7 @@ public BatchHolder() { MaterializedField outputField = materializedValueFields[i]; // Create a type-specific ValueVector for this value vector = TypeHelper.getNewVector(outputField, allocator); + int columnSize = new RecordBatchSizer.ColumnSize(vector).estSize; --- End diff -- @ppadma I thought estSize represented the estimated column width. For FixedWidth vectors we know the exact column width, so why can't we use the exact value? Also why are there two different things for measuring column sizes, when do you use RecordBatchSizer and when do you use TypeHelper? > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344474#comment-16344474 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r164634551 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -215,6 +206,7 @@ public BatchHolder() { MaterializedField outputField = materializedValueFields[i]; // Create a type-specific ValueVector for this value vector = TypeHelper.getNewVector(outputField, allocator); + int columnSize = new RecordBatchSizer.ColumnSize(vector).estSize; --- End diff -- Why not just use TypeHelper.getSize(outputField.getType()) ? It seems like you don't need RecordBatchSizer for this. estSize in RecordBatchSizer is taking actual memory allocation into account i.e. it includes the over head of unused vector space and we need that value as well for different reasons. I also think it is doing right thing by returning zero when there are no rows and no memory allocated. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344403#comment-16344403 ] ASF GitHub Bot commented on DRILL-6032: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r164624503 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -215,6 +206,7 @@ public BatchHolder() { MaterializedField outputField = materializedValueFields[i]; // Create a type-specific ValueVector for this value vector = TypeHelper.getNewVector(outputField, allocator); + int columnSize = new RecordBatchSizer.ColumnSize(vector).estSize; --- End diff -- Agree that the "sizer" should return the size of a fixed-width column even if there are 0 rows. Thanks for catching and fixing this. Though, I will point out that if you get a 0-length batch, just throw it away and read another; no useful purpose is served by doing anything with an empty batch (except to pass it along for schema purposes if this is the first batch: the so-called "fast schema" that is supposed to work, but seems to not be fully implemented...) I've found that Drill behaves poorly and erratically when trying to allocate zero-sized vectors. Best not to do that. If your batch size is 0, don't allocate vectors would be my general advice to avoid lots of grief. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344399#comment-16344399 ] ASF GitHub Bot commented on DRILL-6032: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r164623893 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java --- @@ -129,11 +143,16 @@ public ColumnSize(ValueVector v, String prefix) { // No standard size for Union type dataSize = v.getPayloadByteCount(valueCount); break; + case GENERIC_OBJECT: +// We cannot provide a size for Generic Objects --- End diff -- The `GENERIC_OBJECT` type is used any time we do a system table query: system tables are represented as Java objects. There is an open question about certain aggregate functions. (See a note sent a week or so ago.) These aggregate functions use an `ObjectHolder` as their `\@Workspace`. @Ben-Zvi and I discussed whether such aggregates are spillable. This may be an unresolved issue. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344397#comment-16344397 ] ASF GitHub Bot commented on DRILL-6032: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r164623527 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java --- @@ -232,9 +251,8 @@ else if (width > 0) { } } - public static final int MAX_VECTOR_SIZE = ValueVector.MAX_BUFFER_SIZE; // 16 MiB - private List columnSizes = new ArrayList<>(); + private MapcolumnSizeMap = CaseInsensitiveMap.newHashMap(); --- End diff -- Drill is case insensitive internally. The case insensitive map is correct. Thanks for catching this @ilooner! Unfortunately, record batches have no name space: they are just a collection of vectors. So, we could end up with columns called both "c" and "C". This situation will case the column size map to end up with one entry for both columns, with the last writer winning. The best solution would be to enforce name space uniqueness when creating vectors. The new "result set loader" does this, but I suspect other readers might not -- depending on the particular way that they create their vectors. Still, creating names that differ only in case is a bug and any code doing that should be fixed. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344347#comment-16344347 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ilooner commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r164615997 --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java --- @@ -53,59 +51,54 @@ @Rule public final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher(); -/** - * A template for Hash Aggr spilling tests - * - * @throws Exception - */ -private void testSpill(long maxMem, long numPartitions, long minBatches, int maxParallel, boolean fallback ,boolean predict, - String sql, long expectedRows, int cycle, int fromPart, int toPart) throws Exception { -LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder() --- End diff -- I can add this back. In general I don't like leaving unused code in the codebase though. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344343#comment-16344343 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ilooner commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r164615743 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java --- @@ -232,9 +251,8 @@ else if (width > 0) { } } - public static final int MAX_VECTOR_SIZE = ValueVector.MAX_BUFFER_SIZE; // 16 MiB - private List columnSizes = new ArrayList<>(); + private MapcolumnSizeMap = CaseInsensitiveMap.newHashMap(); --- End diff -- I will follow up on this on the mailing list. I was under the impression that Drill is case-**in**sensitive and did not support case sensitivity of columns in an external store like HBase or MapRDB. If Drill is case-sensitive then we have a bigger issue because some of the functional tests assume that it is case-**in**sensitive. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344338#comment-16344338 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ilooner commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r164615077 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java --- @@ -129,11 +143,16 @@ public ColumnSize(ValueVector v, String prefix) { // No standard size for Union type dataSize = v.getPayloadByteCount(valueCount); break; + case GENERIC_OBJECT: +// We cannot provide a size for Generic Objects --- End diff -- Execution gets here in some of the HashAgg functional tests. Probably in the case where varchars are aggregated, since as you explained to me varchars are stored in object vectors on heap. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344333#comment-16344333 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ilooner commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r164614812 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java --- @@ -805,7 +803,12 @@ private IntVector allocMetadataVector(int size, int initialValue) { } @Override - public void setMaxVarcharSize(int size) { maxVarcharSize = size; } + public void setKeySizes(MapkeySizes) { +Preconditions.checkNotNull(keySizes); + +this.keySizes = CaseInsensitiveMap.newHashMap(); --- End diff -- It helps to avoid bugs. It is assumed that the keySizes map will never change once it is set, copying the map helps enforce that constraint. If we don't copy the map and a user calls this method and passes a keySizes map and then later updates the keySizes map or reuses it, errors would occur. Some languages like Scala have immutable flavors of data structures for this reason. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344325#comment-16344325 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ilooner commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r164613714 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -956,21 +925,8 @@ private void spillAPartition(int part) { this.htables[part].outputKeys(currOutBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value, numPendingOutput); // set the value count for outgoing batch value vectors - /* int i = 0; */ for (VectorWrapper v : outgoing) { v.getValueVector().getMutator().setValueCount(numOutputRecords); -/* --- End diff -- If this logic is really critical for debugging we should have utility methods to help with it so that others can benefit from the code. Maybe we can brainstorm about what methods would be helpful for debugging and start building a utility library. In the meantime if you need this code to debug you can easily look into the history and copy and paste it from there. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344317#comment-16344317 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ilooner commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r164612920 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -516,43 +501,48 @@ private void initializeSetup(RecordBatch newIncoming) throws SchemaChangeExcepti * @param incoming */ private void updateEstMaxBatchSize(RecordBatch incoming) { -if ( estMaxBatchSize > 0 ) { return; } // no handling of a schema (or varchar) change // Use the sizer to get the input row width and the length of the longest varchar column -RecordBatchSizer sizer = new RecordBatchSizer(incoming); -logger.trace("Incoming sizer: {}",sizer); -// An empty batch only has the schema, can not tell actual length of varchars -// else use the actual varchars length, each capped at 50 (to match the space allocation) -long estInputRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : sizer.netRowWidthCap50(); +final RecordBatchSizer incomingColumnSizes = new RecordBatchSizer(incoming); +final MapcolumnSizeMap = incomingColumnSizes.getColumnSizeMap(); +keySizes = CaseInsensitiveMap.newHashMap(); -// Get approx max (varchar) column width to get better memory allocation -maxColumnWidth = Math.max(sizer.maxAvgColumnSize(), VARIABLE_MIN_WIDTH_VALUE_SIZE); -maxColumnWidth = Math.min(maxColumnWidth, VARIABLE_MAX_WIDTH_VALUE_SIZE); +logger.trace("Incoming sizer: {}",incomingColumnSizes); -// // Calculate the estimated max (internal) batch (i.e. Keys batch + Values batch) size // (which is used to decide when to spill) // Also calculate the values batch size (used as a reserve to overcome an OOM) -// -Iterator outgoingIter = outContainer.iterator(); -int fieldId = 0; -while (outgoingIter.hasNext()) { - ValueVector vv = outgoingIter.next().getValueVector(); - MaterializedField mr = vv.getField(); - int fieldSize = vv instanceof VariableWidthVector ? maxColumnWidth : - TypeHelper.getSize(mr.getType()); - estRowWidth += fieldSize; - estOutputRowWidth += fieldSize; - if ( fieldId < numGroupByOutFields ) { fieldId++; } - else { estValuesRowWidth += fieldSize; } + +for (int columnIndex = 0; columnIndex < numGroupByOutFields; columnIndex++) { + final VectorWrapper vectorWrapper = outContainer.getValueVector(columnIndex); + final String columnName = vectorWrapper.getField().getName(); + final int columnSize = columnSizeMap.get(columnName).estSize; + keySizes.put(columnName, columnSize); + estOutputRowWidth += columnSize; } + +long estValuesRowWidth = 0; + +for (int columnIndex = numGroupByOutFields; columnIndex < outContainer.getNumberOfColumns(); columnIndex++) { + VectorWrapper vectorWrapper = outContainer.getValueVector(columnIndex); + RecordBatchSizer.ColumnSize columnSize = new RecordBatchSizer.ColumnSize(vectorWrapper.getValueVector()); + estOutputRowWidth += columnSize.estSize; + estValuesRowWidth += columnSize.estSize; +} + // multiply by the max number of rows in a batch to get the final estimated max size -estMaxBatchSize = Math.max(estRowWidth, estInputRowWidth) * MAX_BATCH_SIZE; +estMaxBatchSize = Math.max(estOutputRowWidth, 1) * MAX_BATCH_SIZE; // (When there are no aggr functions, use '1' as later code relies on this size being non-zero) +// Note: estValuesBatchSize cannot be 0 because a zero value for estValuesBatchSize will cause reserveValueBatchMemory to have a value of 0. And the meaning +// of a reserveValueBatchMemory value of 0 has multiple meanings in different contexts. So estValuesBatchSize has an enforced minimum value of 1, without this +// estValuesBatchsize could have a value of 0 in the case were there are no value columns and all the columns are key columns. estValuesBatchSize = Math.max(estValuesRowWidth, 1) * MAX_BATCH_SIZE; estOutgoingAllocSize = estValuesBatchSize; // initially assume same size -logger.trace("{} phase. Estimated internal row width: {} Values row width: {} batch size: {} memory limit: {} max column width: {}", - isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",estRowWidth,estValuesRowWidth,estMaxBatchSize,allocator.getLimit(),maxColumnWidth); +if (logger.isTraceEnabled()) {
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344309#comment-16344309 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ilooner commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r164612280 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -84,13 +85,6 @@ public abstract class HashAggTemplate implements HashAggregator { protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregator.class); - private static final int VARIABLE_MAX_WIDTH_VALUE_SIZE = 50; - private static final int VARIABLE_MIN_WIDTH_VALUE_SIZE = 8; - - private static final boolean EXTRA_DEBUG_1 = false; --- End diff -- Logging frameworks were invented to solve this problem. We should just use the logging framework. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344297#comment-16344297 ] ASF GitHub Bot commented on DRILL-6032: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r164609183 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -516,43 +501,48 @@ private void initializeSetup(RecordBatch newIncoming) throws SchemaChangeExcepti * @param incoming */ private void updateEstMaxBatchSize(RecordBatch incoming) { -if ( estMaxBatchSize > 0 ) { return; } // no handling of a schema (or varchar) change // Use the sizer to get the input row width and the length of the longest varchar column -RecordBatchSizer sizer = new RecordBatchSizer(incoming); -logger.trace("Incoming sizer: {}",sizer); -// An empty batch only has the schema, can not tell actual length of varchars -// else use the actual varchars length, each capped at 50 (to match the space allocation) -long estInputRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : sizer.netRowWidthCap50(); +final RecordBatchSizer incomingColumnSizes = new RecordBatchSizer(incoming); +final MapcolumnSizeMap = incomingColumnSizes.getColumnSizeMap(); +keySizes = CaseInsensitiveMap.newHashMap(); -// Get approx max (varchar) column width to get better memory allocation -maxColumnWidth = Math.max(sizer.maxAvgColumnSize(), VARIABLE_MIN_WIDTH_VALUE_SIZE); -maxColumnWidth = Math.min(maxColumnWidth, VARIABLE_MAX_WIDTH_VALUE_SIZE); +logger.trace("Incoming sizer: {}",incomingColumnSizes); -// // Calculate the estimated max (internal) batch (i.e. Keys batch + Values batch) size // (which is used to decide when to spill) // Also calculate the values batch size (used as a reserve to overcome an OOM) -// -Iterator outgoingIter = outContainer.iterator(); -int fieldId = 0; -while (outgoingIter.hasNext()) { - ValueVector vv = outgoingIter.next().getValueVector(); - MaterializedField mr = vv.getField(); - int fieldSize = vv instanceof VariableWidthVector ? maxColumnWidth : - TypeHelper.getSize(mr.getType()); - estRowWidth += fieldSize; - estOutputRowWidth += fieldSize; - if ( fieldId < numGroupByOutFields ) { fieldId++; } - else { estValuesRowWidth += fieldSize; } + +for (int columnIndex = 0; columnIndex < numGroupByOutFields; columnIndex++) { + final VectorWrapper vectorWrapper = outContainer.getValueVector(columnIndex); + final String columnName = vectorWrapper.getField().getName(); + final int columnSize = columnSizeMap.get(columnName).estSize; + keySizes.put(columnName, columnSize); + estOutputRowWidth += columnSize; } + +long estValuesRowWidth = 0; + +for (int columnIndex = numGroupByOutFields; columnIndex < outContainer.getNumberOfColumns(); columnIndex++) { + VectorWrapper vectorWrapper = outContainer.getValueVector(columnIndex); + RecordBatchSizer.ColumnSize columnSize = new RecordBatchSizer.ColumnSize(vectorWrapper.getValueVector()); + estOutputRowWidth += columnSize.estSize; --- End diff -- What is missing (maybe in the original code as well) is those extras: The Values batch sometimes uses those 8 byte vectors to represent nullable, and for some aggregations (e.g. STDDEV), internally there are two columns ("count" and "sum of squares", or something like that), but the outContainer has only one - with the result (this is why the Values vectors in the outContainer are always allocated, instead of passing the existing vectors - see DRILL-5588 ) > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344292#comment-16344292 ] ASF GitHub Bot commented on DRILL-6032: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r164571408 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java --- @@ -129,11 +143,16 @@ public ColumnSize(ValueVector v, String prefix) { // No standard size for Union type dataSize = v.getPayloadByteCount(valueCount); break; + case GENERIC_OBJECT: +// We cannot provide a size for Generic Objects --- End diff -- How can execution ever get here ? Should we throw a UserException.unsupportedError here ? > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344296#comment-16344296 ] ASF GitHub Bot commented on DRILL-6032: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r164604859 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -397,11 +384,9 @@ private void delayedSetup() { } numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not a power of 2 -if ( schema == null ) { estValuesBatchSize = estOutgoingAllocSize = estMaxBatchSize = 0; } // incoming was an empty batch --- End diff -- Why was the ( schema == null ) check removed ? Without it, calling updateEstMaxBatchSize() would return an NPE when accessing the batch's schema (when calling RecordBatchSizer(incoming) ). > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344293#comment-16344293 ] ASF GitHub Bot commented on DRILL-6032: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r164576364 --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java --- @@ -53,59 +51,54 @@ @Rule public final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher(); -/** - * A template for Hash Aggr spilling tests - * - * @throws Exception - */ -private void testSpill(long maxMem, long numPartitions, long minBatches, int maxParallel, boolean fallback ,boolean predict, - String sql, long expectedRows, int cycle, int fromPart, int toPart) throws Exception { -LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder() --- End diff -- The logBuilder is here in case someone uses the Hash Agg spill tests and needs to log. It saves the trouble to put this code back again for that purpose. Unfortunately Java does not support #ifdef .. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344295#comment-16344295 ] ASF GitHub Bot commented on DRILL-6032: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r164600675 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java --- @@ -805,7 +803,12 @@ private IntVector allocMetadataVector(int size, int initialValue) { } @Override - public void setMaxVarcharSize(int size) { maxVarcharSize = size; } + public void setKeySizes(MapkeySizes) { +Preconditions.checkNotNull(keySizes); + +this.keySizes = CaseInsensitiveMap.newHashMap(); --- End diff -- Minor: Why does every hash table allocate a new map and copy its values ? Can't all just share the keySizes map ? > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344298#comment-16344298 ] ASF GitHub Bot commented on DRILL-6032: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r164609742 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -84,13 +85,6 @@ public abstract class HashAggTemplate implements HashAggregator { protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregator.class); - private static final int VARIABLE_MAX_WIDTH_VALUE_SIZE = 50; - private static final int VARIABLE_MIN_WIDTH_VALUE_SIZE = 8; - - private static final boolean EXTRA_DEBUG_1 = false; --- End diff -- This is Aman's implementation of an #ifdef . Basically he wanted to preserve these debugging messages, but not seeing them (there are too many) while doing some other debugging work. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344294#comment-16344294 ] ASF GitHub Bot commented on DRILL-6032: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r164574746 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java --- @@ -232,9 +251,8 @@ else if (width > 0) { } } - public static final int MAX_VECTOR_SIZE = ValueVector.MAX_BUFFER_SIZE; // 16 MiB - private List columnSizes = new ArrayList<>(); + private MapcolumnSizeMap = CaseInsensitiveMap.newHashMap(); --- End diff -- There may be an issue with MapRDB or HBase which are case sensitive. Also Drill allows for quoted column names (e.g., "FOO" or "foo") which keep the case sensitivity. One crude "fix" can be to add a user option, so when turned on, this code would use an AbstractHashedMap instead. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344291#comment-16344291 ] ASF GitHub Bot commented on DRILL-6032: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r164549701 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java --- @@ -255,7 +254,6 @@ private HashAggregator createAggregatorInternal() throws SchemaChangeException, groupByOutFieldIds[i] = container.add(vv); } -int extraNonNullColumns = 0; // each of SUM, MAX and MIN gets an extra bigint column --- End diff -- Why was this removed ? Unfortunately the code generator still generates an internal Bigint column to keep the "not null" status of the Aggregated Values, in the cases of sum/max/min. Below is an example of a generated code (returning a nullable varchar) - note the nonNullCount (see DRILL-5728): ``` public void outputRecordValues(int htRowIdx, int outRowIdx) throws SchemaChangeException { { NullableVarCharHolder out17; { final NullableVarCharHolder out = new NullableVarCharHolder(); vv1 .getAccessor().get((htRowIdx), work0); ObjectHolder value = work0; work4 .value = vv5 .getAccessor().get((htRowIdx)); UInt1Holder init = work4; work8 .value = vv9 .getAccessor().get((htRowIdx)); BigIntHolder nonNullCount = work8; DrillBuf buf = work12; MaxVarBytesFunctions$NullableVarCharMax_output: { if (nonNullCount.value > 0) { out.isSet = 1; org.apache.drill.exec.expr.fn.impl.DrillByteArray tmp = (org.apache.drill.exec.expr.fn.impl.DrillByteArray) value.obj; buf = buf.reallocIfNeeded(tmp.getLength()); buf.setBytes(0, tmp.getBytes(), 0, tmp.getLength()); out.start = 0; out.end = tmp.getLength(); out.buffer = buf; } else { out.isSet = 0; } } work0 = value; vv1 .getMutator().setSafe((htRowIdx), work0); work4 = init; vv5 .getMutator().set((htRowIdx), work4 .value); work8 = nonNullCount; vv9 .getMutator().set((htRowIdx), work8 .value); work12 = buf; out17 = out; } if (!(out17 .isSet == 0)) { vv18 .getMutator().setSafe((outRowIdx), out17 .isSet, out17 .start, out17 .end, out17 .buffer); } } } ``` > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344299#comment-16344299 ] ASF GitHub Bot commented on DRILL-6032: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r164611358 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -956,21 +925,8 @@ private void spillAPartition(int part) { this.htables[part].outputKeys(currOutBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value, numPendingOutput); // set the value count for outgoing batch value vectors - /* int i = 0; */ for (VectorWrapper v : outgoing) { v.getValueVector().getMutator().setValueCount(numOutputRecords); -/* --- End diff -- We don't have any facility to take a peek at actual data for debugging. While this commented code looks ugly, it allows for easy enabling to look at data, if needed (or an example of how to do so, for elsewhere). > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344286#comment-16344286 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ilooner commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r164611155 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -215,6 +206,7 @@ public BatchHolder() { MaterializedField outputField = materializedValueFields[i]; // Create a type-specific ValueVector for this value vector = TypeHelper.getNewVector(outputField, allocator); + int columnSize = new RecordBatchSizer.ColumnSize(vector).estSize; --- End diff -- The goal of this code is to preallocate enough space to hold BATCH_SIZE number of elements in the batch. The BatchHolder itself is used to hold the aggregate values and aggregate values can currently only be FixedWidth vectors or ObjectVectors. Since we know how much direct memory each of these types will consume, we can use that knowledge in our column size estimate and preallocate the correct amount of space. **Note:** earlier the RecordBatchSizer would return an estSize of 0 for an empty FixedWidth value vectors. For example and empty IntVector would return an estSize of 0. This was incorrect behavior so I updated the RecordBatchSizer to return the correct size of a FixedWidth vector even when the value vector is empty. Please see the changes in the RecordBatchSizer and ValueVector templates for more details. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344203#comment-16344203 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/1101#discussion_r164599681 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -215,6 +206,7 @@ public BatchHolder() { MaterializedField outputField = materializedValueFields[i]; // Create a type-specific ValueVector for this value vector = TypeHelper.getNewVector(outputField, allocator); + int columnSize = new RecordBatchSizer.ColumnSize(vector).estSize; --- End diff -- I wonder what is wrong if estSize is 0 when there is no data. If there is no data for a column, why would we want to add it's value width to outgoing row width ? > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16341886#comment-16341886 ] ASF GitHub Bot commented on DRILL-6032: --- Github user ilooner commented on the issue: https://github.com/apache/drill/pull/1101 @Ben-Zvi please review > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (DRILL-6032) Use RecordBatchSizer to estimate size of columns in HashAgg
[ https://issues.apache.org/jira/browse/DRILL-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16341885#comment-16341885 ] ASF GitHub Bot commented on DRILL-6032: --- GitHub user ilooner opened a pull request: https://github.com/apache/drill/pull/1101 DRILL-6032: Made the batch sizing for HashAgg more accurate. ## RecordBatchSizer changes - The RecordBatchSizer previously computed fixed width column sizes by measuring the total size of a vector and dividing by the number of elements. Because of this the RecordBatchSizer would return a zero size for FixedWidth vectors that had no data. So I added a method to FixedWidth vectors to get the size of a record and use that method to compute the column width in the RecordBatchSizer. - In some cases it was possible for the RecordBatchSizer to return a column width of 0, when it is not possible to have vectors with a width of 1 in practice. So I made the minimum column width returned by the RecordBatchSizer 1. ## HashAgg changes - Removed commented out code and unused variables. - Removed if statements for printing debug statements and instead used logger.debug - Removed the extraNonNullColumns and extraRowBytes tweak parameters for computing the sizes of batches - The RecordBatchSizer is used to compute the width of each column instead of adhoc custom logic. - Using the real width of each column to estimate column sizes instead of taking the max width of all columns and assuming each column has the max width - Removed the assumption that varchars will not exceed 50 characters in length - Removed unnecessary condition checks in delayedSetup and updateEstMaxBatchSize You can merge this pull request into a Git repository by running: $ git pull https://github.com/ilooner/drill DRILL-6032 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/drill/pull/1101.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1101 commit b8682f6e09889e5ba334f36006fb9ed754f571f6 Author: Timothy FarkasDate: 2017-12-13T23:44:28Z DRILL-6032: Made the batch sizing for HashAgg more accurate. > Use RecordBatchSizer to estimate size of columns in HashAgg > --- > > Key: DRILL-6032 > URL: https://issues.apache.org/jira/browse/DRILL-6032 > Project: Apache Drill > Issue Type: Improvement >Reporter: Timothy Farkas >Assignee: Timothy Farkas >Priority: Major > Fix For: 1.13.0 > > > We need to use the RecordBatchSize to estimate the size of columns in the > Partition batches created by HashAgg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)