[jira] [Commented] (DRILL-6238) Batch sizing for operators

2018-03-17 Thread Paul Rogers (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16403832#comment-16403832
 ] 

Paul Rogers commented on DRILL-6238:


External sort: it already has limits. It is the granddaddy of limits; it 
developed the techniques that you are using. It has no row-based limits; all 
batches are sized on memory. It does have multiple sizes (one size for spill 
batches, another for output batches.)

> Batch sizing for operators
> --
>
> Key: DRILL-6238
> URL: https://issues.apache.org/jira/browse/DRILL-6238
> Project: Apache Drill
>  Issue Type: New Feature
>Reporter: Padma Penumarthy
>Assignee: Padma Penumarthy
>Priority: Major
>
> *Batch Sizing For Operators*
> This document describes the approach we are taking for limiting batch sizes 
> for operators other than scan.
> *Motivation*
> Main goals are
>  # Improve concurrency
>  # Reduce query failures because of out of memory errors
> To accomplish these goals, we need to make queries execute within a specified 
> memory budget. To enforce per query memory limit, we need to be able to 
> enforce per fragment and per operator memory limits. Controlling individual 
> operators batch sizes is the first step towards all this.
> *Background*
> In Drill, different operators have different limits w.r.to outgoing batches. 
> Some use hard coded row counts, some use hard coded memory and some have none 
> at all. Based on input data size and what the operator is doing, memory used 
> by the outgoing batch can vary widely as there are no limits imposed. Queries 
> fail because we are not able to allocate the memory needed. Some operators 
> produce very large batches, causing blocking operators like sort, hash agg 
> which have to work under tight memory constraints to fail. Size of batches 
> should be a function of available memory rather than input data size and/or 
> what the operator does. Please refer to table at the end of this document for 
> details on what each operator does today.
> *Design*
> Goal is to have all operators behave the same way i.e. produce batches with 
> size less than or equal to configured outgoing batch size with a minimum of 1 
> row per batch and maximum of 64k rows per batch. A new system option 
> ‘drill.exec.memory.operator.output_batch_size’ is added which has default 
> value of 16MB.
> The basic idea is to limit size of outgoing batch by deciding how many rows 
> we can have in the batch based on average entry size of each outgoing column, 
> taking into account actual data size and metadata vector overhead we add on 
> top for tracking variable length, mode(repeated, optional, required) etc. 
> This calculation will be different for each operator and is based on
>  # What the operator is doing
>  # Incoming batch size that includes information on type and average size of 
> each column
>  # What is being projected out
> By taking this adaptive approach based on actual average data sizes, for 
> operators which were limiting batch size to less than 64K rows before can 
> possibly do lot more rows (upto 64K rows) in a batch if the memory stays 
> within the budget. For example, flatten and joins have batch size of 4K rows, 
> which probably might have been done to be conservative w.r.to memory usage. 
> By making these operators go upto 64K rows as long as they stay with in the 
> memory budget should help improve performance.
> Also, to improve performance and utilize memory more efficiently, we will
>  # Allocate memory for value vectors upfront. Since we know the number of 
> rows and sizing information for each column in the  outgoing batch, we will 
> use that information to allocate memory for value vectors upfront.  
> Currently, we either do initial allocation for 4K values and keep doubling 
> every time we need more or allocate for maximum needed upfront. With this 
> change to pre allocate memory based on sizing calculation, we can improve 
> performance by reducing the memory copies and zeroing the new half we do 
> every time we double and help save memory in cases where we were over 
> allocating before.
>  # Round down the number of rows in outgoing batch to a power of two. Since 
> memory is allocated in powers of two, this will help us pack the value 
> vectors densely thereby reducing the amount of memory that gets wasted 
> because of doubling effect.
> So, to summarize, the benefits we will get are improved memory utilization, 
> better performance, higher concurrency and less queries dying because of out 
> of memory errors.
> Note: Since these sizing calculations are based on averages, strict memory 
> usage enforcement is not possible. There could be pathological cases where 
> because of uneven data distribution, we might exceed the configured output 
> batch size potentially cau

[jira] [Commented] (DRILL-6238) Batch sizing for operators

2018-03-17 Thread Paul Rogers (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16403831#comment-16403831
 ] 

Paul Rogers commented on DRILL-6238:


In the table, would be worth adding a note to some operators to explain why 
there is no limit. For example, Filter simply tags an existing batch, it does 
not create a new one, so there is nothing to limit.

Merge, however, must combine incoming batches to produce new batches. The 
resulting batch is fed into the next downstream operator. Shouldn't we limit 
the merge batch size to prevent blowing up the downstream operator?

Perhaps some of the "no limit" is due to the issue above (nothing to limit.) 
Perhaps some is due to technical complexity (no limit for merge because it is 
too complex to change.) Perhaps some is based on priority (say, we could limit 
joins, but we're adding spilling anyway, so let's worry about limits later.) 
Would be worth calling out each of these cases.

> Batch sizing for operators
> --
>
> Key: DRILL-6238
> URL: https://issues.apache.org/jira/browse/DRILL-6238
> Project: Apache Drill
>  Issue Type: New Feature
>Reporter: Padma Penumarthy
>Assignee: Padma Penumarthy
>Priority: Major
>
> *Batch Sizing For Operators*
> This document describes the approach we are taking for limiting batch sizes 
> for operators other than scan.
> *Motivation*
> Main goals are
>  # Improve concurrency
>  # Reduce query failures because of out of memory errors
> To accomplish these goals, we need to make queries execute within a specified 
> memory budget. To enforce per query memory limit, we need to be able to 
> enforce per fragment and per operator memory limits. Controlling individual 
> operators batch sizes is the first step towards all this.
> *Background*
> In Drill, different operators have different limits w.r.to outgoing batches. 
> Some use hard coded row counts, some use hard coded memory and some have none 
> at all. Based on input data size and what the operator is doing, memory used 
> by the outgoing batch can vary widely as there are no limits imposed. Queries 
> fail because we are not able to allocate the memory needed. Some operators 
> produce very large batches, causing blocking operators like sort, hash agg 
> which have to work under tight memory constraints to fail. Size of batches 
> should be a function of available memory rather than input data size and/or 
> what the operator does. Please refer to table at the end of this document for 
> details on what each operator does today.
> *Design*
> Goal is to have all operators behave the same way i.e. produce batches with 
> size less than or equal to configured outgoing batch size with a minimum of 1 
> row per batch and maximum of 64k rows per batch. A new system option 
> ‘drill.exec.memory.operator.output_batch_size’ is added which has default 
> value of 16MB.
> The basic idea is to limit size of outgoing batch by deciding how many rows 
> we can have in the batch based on average entry size of each outgoing column, 
> taking into account actual data size and metadata vector overhead we add on 
> top for tracking variable length, mode(repeated, optional, required) etc. 
> This calculation will be different for each operator and is based on
>  # What the operator is doing
>  # Incoming batch size that includes information on type and average size of 
> each column
>  # What is being projected out
> By taking this adaptive approach based on actual average data sizes, for 
> operators which were limiting batch size to less than 64K rows before can 
> possibly do lot more rows (upto 64K rows) in a batch if the memory stays 
> within the budget. For example, flatten and joins have batch size of 4K rows, 
> which probably might have been done to be conservative w.r.to memory usage. 
> By making these operators go upto 64K rows as long as they stay with in the 
> memory budget should help improve performance.
> Also, to improve performance and utilize memory more efficiently, we will
>  # Allocate memory for value vectors upfront. Since we know the number of 
> rows and sizing information for each column in the  outgoing batch, we will 
> use that information to allocate memory for value vectors upfront.  
> Currently, we either do initial allocation for 4K values and keep doubling 
> every time we need more or allocate for maximum needed upfront. With this 
> change to pre allocate memory based on sizing calculation, we can improve 
> performance by reducing the memory copies and zeroing the new half we do 
> every time we double and help save memory in cases where we were over 
> allocating before.
>  # Round down the number of rows in outgoing batch to a power of two. Since 
> memory is allocated in powers of two, this will help us pack the value 
> vectors densely t

[jira] [Commented] (DRILL-6238) Batch sizing for operators

2018-03-17 Thread Paul Rogers (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16403830#comment-16403830
 ] 

Paul Rogers commented on DRILL-6238:


Regarding "higher concurrency"... Can you explain a bit more how better memory 
management improves concurrency? Not entirely obvious to the causal observer...

> Batch sizing for operators
> --
>
> Key: DRILL-6238
> URL: https://issues.apache.org/jira/browse/DRILL-6238
> Project: Apache Drill
>  Issue Type: New Feature
>Reporter: Padma Penumarthy
>Assignee: Padma Penumarthy
>Priority: Major
>
> *Batch Sizing For Operators*
> This document describes the approach we are taking for limiting batch sizes 
> for operators other than scan.
> *Motivation*
> Main goals are
>  # Improve concurrency
>  # Reduce query failures because of out of memory errors
> To accomplish these goals, we need to make queries execute within a specified 
> memory budget. To enforce per query memory limit, we need to be able to 
> enforce per fragment and per operator memory limits. Controlling individual 
> operators batch sizes is the first step towards all this.
> *Background*
> In Drill, different operators have different limits w.r.to outgoing batches. 
> Some use hard coded row counts, some use hard coded memory and some have none 
> at all. Based on input data size and what the operator is doing, memory used 
> by the outgoing batch can vary widely as there are no limits imposed. Queries 
> fail because we are not able to allocate the memory needed. Some operators 
> produce very large batches, causing blocking operators like sort, hash agg 
> which have to work under tight memory constraints to fail. Size of batches 
> should be a function of available memory rather than input data size and/or 
> what the operator does. Please refer to table at the end of this document for 
> details on what each operator does today.
> *Design*
> Goal is to have all operators behave the same way i.e. produce batches with 
> size less than or equal to configured outgoing batch size with a minimum of 1 
> row per batch and maximum of 64k rows per batch. A new system option 
> ‘drill.exec.memory.operator.output_batch_size’ is added which has default 
> value of 16MB.
> The basic idea is to limit size of outgoing batch by deciding how many rows 
> we can have in the batch based on average entry size of each outgoing column, 
> taking into account actual data size and metadata vector overhead we add on 
> top for tracking variable length, mode(repeated, optional, required) etc. 
> This calculation will be different for each operator and is based on
>  # What the operator is doing
>  # Incoming batch size that includes information on type and average size of 
> each column
>  # What is being projected out
> By taking this adaptive approach based on actual average data sizes, for 
> operators which were limiting batch size to less than 64K rows before can 
> possibly do lot more rows (upto 64K rows) in a batch if the memory stays 
> within the budget. For example, flatten and joins have batch size of 4K rows, 
> which probably might have been done to be conservative w.r.to memory usage. 
> By making these operators go upto 64K rows as long as they stay with in the 
> memory budget should help improve performance.
> Also, to improve performance and utilize memory more efficiently, we will
>  # Allocate memory for value vectors upfront. Since we know the number of 
> rows and sizing information for each column in the  outgoing batch, we will 
> use that information to allocate memory for value vectors upfront.  
> Currently, we either do initial allocation for 4K values and keep doubling 
> every time we need more or allocate for maximum needed upfront. With this 
> change to pre allocate memory based on sizing calculation, we can improve 
> performance by reducing the memory copies and zeroing the new half we do 
> every time we double and help save memory in cases where we were over 
> allocating before.
>  # Round down the number of rows in outgoing batch to a power of two. Since 
> memory is allocated in powers of two, this will help us pack the value 
> vectors densely thereby reducing the amount of memory that gets wasted 
> because of doubling effect.
> So, to summarize, the benefits we will get are improved memory utilization, 
> better performance, higher concurrency and less queries dying because of out 
> of memory errors.
> Note: Since these sizing calculations are based on averages, strict memory 
> usage enforcement is not possible. There could be pathological cases where 
> because of uneven data distribution, we might exceed the configured output 
> batch size potentially causing OOM errors and problems in downstream 
> operators.
> Other issues that will be addressed:
>  * We are 

[jira] [Commented] (DRILL-6238) Batch sizing for operators

2018-03-17 Thread Paul Rogers (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16403829#comment-16403829
 ] 

Paul Rogers commented on DRILL-6238:


Regarding:

bq. Round down the number of rows in outgoing batch to a power of two. Since 
memory is allocated in powers of two, this will help us pack the value vectors 
densely...

The idea is fine for rows that consist only of integers or floats. But, dates 
often have non-power of two sizes (such as 12). Also, {{VARCHAR}} vectors are 
completely random.

The challenge is this. Suppose that we figure out that a row has a {{VARCHAR}} 
column. We can fit x rows of text data before we hit our overall size limit. We 
then round down to a power of two. This means that non-text vectors may be full 
(at least those of size 4 or 8 bytes), but the data vectors for {{VARCHAR}} may 
be quite empty.

This is particularly true if the {{VARCHAR}} data dominates the row size. If we 
figure we can add, say, 2000 {{VARCHAR}} values of 1000 bytes each, then if we 
round down to 1024 values total, we'll end up wasting about half of the data 
vector size for the {{VARCHAR}}.

We can, of course, do this in multiple steps:

* Calculate the ideal row count based on batch size / row size.
* Round down to a power of two.
* Recompute the {{VARCHAR}} data sizes as row count * size of that column.

Either way, we're going to waste space. Would be good to know which approach 
wastes less (rounding down or simply stopping when we hit the size limit.) 
Based on that outcome, we may want to modify how the result set loader works if 
the round-down method works better.

> Batch sizing for operators
> --
>
> Key: DRILL-6238
> URL: https://issues.apache.org/jira/browse/DRILL-6238
> Project: Apache Drill
>  Issue Type: New Feature
>Reporter: Padma Penumarthy
>Assignee: Padma Penumarthy
>Priority: Major
>
> *Batch Sizing For Operators*
> This document describes the approach we are taking for limiting batch sizes 
> for operators other than scan.
> *Motivation*
> Main goals are
>  # Improve concurrency
>  # Reduce query failures because of out of memory errors
> To accomplish these goals, we need to make queries execute within a specified 
> memory budget. To enforce per query memory limit, we need to be able to 
> enforce per fragment and per operator memory limits. Controlling individual 
> operators batch sizes is the first step towards all this.
> *Background*
> In Drill, different operators have different limits w.r.to outgoing batches. 
> Some use hard coded row counts, some use hard coded memory and some have none 
> at all. Based on input data size and what the operator is doing, memory used 
> by the outgoing batch can vary widely as there are no limits imposed. Queries 
> fail because we are not able to allocate the memory needed. Some operators 
> produce very large batches, causing blocking operators like sort, hash agg 
> which have to work under tight memory constraints to fail. Size of batches 
> should be a function of available memory rather than input data size and/or 
> what the operator does. Please refer to table at the end of this document for 
> details on what each operator does today.
> *Design*
> Goal is to have all operators behave the same way i.e. produce batches with 
> size less than or equal to configured outgoing batch size with a minimum of 1 
> row per batch and maximum of 64k rows per batch. A new system option 
> ‘drill.exec.memory.operator.output_batch_size’ is added which has default 
> value of 16MB.
> The basic idea is to limit size of outgoing batch by deciding how many rows 
> we can have in the batch based on average entry size of each outgoing column, 
> taking into account actual data size and metadata vector overhead we add on 
> top for tracking variable length, mode(repeated, optional, required) etc. 
> This calculation will be different for each operator and is based on
>  # What the operator is doing
>  # Incoming batch size that includes information on type and average size of 
> each column
>  # What is being projected out
> By taking this adaptive approach based on actual average data sizes, for 
> operators which were limiting batch size to less than 64K rows before can 
> possibly do lot more rows (upto 64K rows) in a batch if the memory stays 
> within the budget. For example, flatten and joins have batch size of 4K rows, 
> which probably might have been done to be conservative w.r.to memory usage. 
> By making these operators go upto 64K rows as long as they stay with in the 
> memory budget should help improve performance.
> Also, to improve performance and utilize memory more efficiently, we will
>  # Allocate memory for value vectors upfront. Since we know the number of 
> rows and sizing information for each column in the  o

[jira] [Commented] (DRILL-6238) Batch sizing for operators

2018-03-17 Thread Paul Rogers (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16403827#comment-16403827
 ] 

Paul Rogers commented on DRILL-6238:


Throwing in a plug here for the [reader-oriented batch size 
design|https://github.com/paul-rogers/drill/wiki/Batch-Handling-Upgrades]. The 
design in this ticket appears to be consistent with the reader-based design. It 
is worth noting that the two sets of work are intended to converge. Eventually, 
the batch size computed here is handed over the the result set loader, which 
will do the busy-work of filling a batch up to the target size. Right now, we 
have to implement it ad-hoc for each operator. That said, the overall memory 
calls *are* unique to each operator, os the work here is very much needed, even 
when the merge eventually occurs.

> Batch sizing for operators
> --
>
> Key: DRILL-6238
> URL: https://issues.apache.org/jira/browse/DRILL-6238
> Project: Apache Drill
>  Issue Type: New Feature
>Reporter: Padma Penumarthy
>Assignee: Padma Penumarthy
>Priority: Major
>
> *Batch Sizing For Operators*
> This document describes the approach we are taking for limiting batch sizes 
> for operators other than scan.
> *Motivation*
> Main goals are
>  # Improve concurrency
>  # Reduce query failures because of out of memory errors
> To accomplish these goals, we need to make queries execute within a specified 
> memory budget. To enforce per query memory limit, we need to be able to 
> enforce per fragment and per operator memory limits. Controlling individual 
> operators batch sizes is the first step towards all this.
> *Background*
> In Drill, different operators have different limits w.r.to outgoing batches. 
> Some use hard coded row counts, some use hard coded memory and some have none 
> at all. Based on input data size and what the operator is doing, memory used 
> by the outgoing batch can vary widely as there are no limits imposed. Queries 
> fail because we are not able to allocate the memory needed. Some operators 
> produce very large batches, causing blocking operators like sort, hash agg 
> which have to work under tight memory constraints to fail. Size of batches 
> should be a function of available memory rather than input data size and/or 
> what the operator does. Please refer to table at the end of this document for 
> details on what each operator does today.
> *Design*
> Goal is to have all operators behave the same way i.e. produce batches with 
> size less than or equal to configured outgoing batch size with a minimum of 1 
> row per batch and maximum of 64k rows per batch. A new system option 
> ‘drill.exec.memory.operator.output_batch_size’ is added which has default 
> value of 16MB.
> The basic idea is to limit size of outgoing batch by deciding how many rows 
> we can have in the batch based on average entry size of each outgoing column, 
> taking into account actual data size and metadata vector overhead we add on 
> top for tracking variable length, mode(repeated, optional, required) etc. 
> This calculation will be different for each operator and is based on
>  # What the operator is doing
>  # Incoming batch size that includes information on type and average size of 
> each column
>  # What is being projected out
> By taking this adaptive approach based on actual average data sizes, for 
> operators which were limiting batch size to less than 64K rows before can 
> possibly do lot more rows (upto 64K rows) in a batch if the memory stays 
> within the budget. For example, flatten and joins have batch size of 4K rows, 
> which probably might have been done to be conservative w.r.to memory usage. 
> By making these operators go upto 64K rows as long as they stay with in the 
> memory budget should help improve performance.
> Also, to improve performance and utilize memory more efficiently, we will
>  # Allocate memory for value vectors upfront. Since we know the number of 
> rows and sizing information for each column in the  outgoing batch, we will 
> use that information to allocate memory for value vectors upfront.  
> Currently, we either do initial allocation for 4K values and keep doubling 
> every time we need more or allocate for maximum needed upfront. With this 
> change to pre allocate memory based on sizing calculation, we can improve 
> performance by reducing the memory copies and zeroing the new half we do 
> every time we double and help save memory in cases where we were over 
> allocating before.
>  # Round down the number of rows in outgoing batch to a power of two. Since 
> memory is allocated in powers of two, this will help us pack the value 
> vectors densely thereby reducing the amount of memory that gets wasted 
> because of doubling effect.
> So, to summarize, the benefits we will get are improve

[jira] [Commented] (DRILL-6238) Batch sizing for operators

2018-03-17 Thread Paul Rogers (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16403823#comment-16403823
 ] 

Paul Rogers commented on DRILL-6238:


Regarding:

bq. The basic idea is to limit size of outgoing batch  This calculation 
will be different for each operator and is based on ...

Suggestion: the output batch size for each operator should be identical: 
operators can be mixed and matched in many ways, so the only constant is that 
the output of one operator is the input to another. Uniform batch size results 
in a system that is easier to reason about.

This may be implied, but is worth calling out.

> Batch sizing for operators
> --
>
> Key: DRILL-6238
> URL: https://issues.apache.org/jira/browse/DRILL-6238
> Project: Apache Drill
>  Issue Type: New Feature
>Reporter: Padma Penumarthy
>Assignee: Padma Penumarthy
>Priority: Major
>
> *Batch Sizing For Operators*
> This document describes the approach we are taking for limiting batch sizes 
> for operators other than scan.
> *Motivation*
> Main goals are
>  # Improve concurrency
>  # Reduce query failures because of out of memory errors
> To accomplish these goals, we need to make queries execute within a specified 
> memory budget. To enforce per query memory limit, we need to be able to 
> enforce per fragment and per operator memory limits. Controlling individual 
> operators batch sizes is the first step towards all this.
> *Background*
> In Drill, different operators have different limits w.r.to outgoing batches. 
> Some use hard coded row counts, some use hard coded memory and some have none 
> at all. Based on input data size and what the operator is doing, memory used 
> by the outgoing batch can vary widely as there are no limits imposed. Queries 
> fail because we are not able to allocate the memory needed. Some operators 
> produce very large batches, causing blocking operators like sort, hash agg 
> which have to work under tight memory constraints to fail. Size of batches 
> should be a function of available memory rather than input data size and/or 
> what the operator does. Please refer to table at the end of this document for 
> details on what each operator does today.
> *Design*
> Goal is to have all operators behave the same way i.e. produce batches with 
> size less than or equal to configured outgoing batch size with a minimum of 1 
> row per batch and maximum of 64k rows per batch. A new system option 
> ‘drill.exec.memory.operator.output_batch_size’ is added which has default 
> value of 16MB.
> The basic idea is to limit size of outgoing batch by deciding how many rows 
> we can have in the batch based on average entry size of each outgoing column, 
> taking into account actual data size and metadata vector overhead we add on 
> top for tracking variable length, mode(repeated, optional, required) etc. 
> This calculation will be different for each operator and is based on
>  # What the operator is doing
>  # Incoming batch size that includes information on type and average size of 
> each column
>  # What is being projected out
> By taking this adaptive approach based on actual average data sizes, for 
> operators which were limiting batch size to less than 64K rows before can 
> possibly do lot more rows (upto 64K rows) in a batch if the memory stays 
> within the budget. For example, flatten and joins have batch size of 4K rows, 
> which probably might have been done to be conservative w.r.to memory usage. 
> By making these operators go upto 64K rows as long as they stay with in the 
> memory budget should help improve performance.
> Also, to improve performance and utilize memory more efficiently, we will
>  # Allocate memory for value vectors upfront. Since we know the number of 
> rows and sizing information for each column in the  outgoing batch, we will 
> use that information to allocate memory for value vectors upfront.  
> Currently, we either do initial allocation for 4K values and keep doubling 
> every time we need more or allocate for maximum needed upfront. With this 
> change to pre allocate memory based on sizing calculation, we can improve 
> performance by reducing the memory copies and zeroing the new half we do 
> every time we double and help save memory in cases where we were over 
> allocating before.
>  # Round down the number of rows in outgoing batch to a power of two. Since 
> memory is allocated in powers of two, this will help us pack the value 
> vectors densely thereby reducing the amount of memory that gets wasted 
> because of doubling effect.
> So, to summarize, the benefits we will get are improved memory utilization, 
> better performance, higher concurrency and less queries dying because of out 
> of memory errors.
> Note: Since these sizing calculations are based on averag

[jira] [Commented] (DRILL-6238) Batch sizing for operators

2018-03-13 Thread Pritesh Maker (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398103#comment-16398103
 ] 

Pritesh Maker commented on DRILL-6238:
--

[~ppenumarthy] I added links to a bunch of issues that are related to the batch 
sizing. Do review to see if I missed any.

> Batch sizing for operators
> --
>
> Key: DRILL-6238
> URL: https://issues.apache.org/jira/browse/DRILL-6238
> Project: Apache Drill
>  Issue Type: New Feature
>Reporter: Padma Penumarthy
>Assignee: Padma Penumarthy
>Priority: Major
>
> *Batch Sizing For Operators*
> This document describes the approach we are taking for limiting batch sizes 
> for operators other than scan.
> *Motivation*
> Main goals are
>  # Improve concurrency
>  # Reduce query failures because of out of memory errors
> To accomplish these goals, we need to make queries execute within a specified 
> memory budget. To enforce per query memory limit, we need to be able to 
> enforce per fragment and per operator memory limits. Controlling individual 
> operators batch sizes is the first step towards all this.
> *Background*
> In Drill, different operators have different limits w.r.to outgoing batches. 
> Some use hard coded row counts, some use hard coded memory and some have none 
> at all. Based on input data size and what the operator is doing, memory used 
> by the outgoing batch can vary widely as there are no limits imposed. Queries 
> fail because we are not able to allocate the memory needed. Some operators 
> produce very large batches, causing blocking operators like sort, hash agg 
> which have to work under tight memory constraints to fail. Size of batches 
> should be a function of available memory rather than input data size and/or 
> what the operator does. Please refer to table at the end of this document for 
> details on what each operator does today.
> *Design*
> Goal is to have all operators behave the same way i.e. produce batches with 
> size less than or equal to configured outgoing batch size with a minimum of 1 
> row per batch and maximum of 64k rows per batch. A new system option 
> ‘drill.exec.memory.operator.output_batch_size’ is added which has default 
> value of 16MB.
> The basic idea is to limit size of outgoing batch by deciding how many rows 
> we can have in the batch based on average entry size of each outgoing column, 
> taking into account actual data size and metadata vector overhead we add on 
> top for tracking variable length, mode(repeated, optional, required) etc. 
> This calculation will be different for each operator and is based on
>  # What the operator is doing
>  # Incoming batch size that includes information on type and average size of 
> each column
>  # What is being projected out
> By taking this adaptive approach based on actual average data sizes, for 
> operators which were limiting batch size to less than 64K rows before can 
> possibly do lot more rows (upto 64K rows) in a batch if the memory stays 
> within the budget. For example, flatten and joins have batch size of 4K rows, 
> which probably might have been done to be conservative w.r.to memory usage. 
> By making these operators go upto 64K rows as long as they stay with in the 
> memory budget should help improve performance.
> Also, to improve performance and utilize memory more efficiently, we will
>  # Allocate memory for value vectors upfront. Since we know the number of 
> rows and sizing information for each column in the  outgoing batch, we will 
> use that information to allocate memory for value vectors upfront.  
> Currently, we either do initial allocation for 4K values and keep doubling 
> every time we need more or allocate for maximum needed upfront. With this 
> change to pre allocate memory based on sizing calculation, we can improve 
> performance by reducing the memory copies and zeroing the new half we do 
> every time we double and help save memory in cases where we were over 
> allocating before.
>  # Round down the number of rows in outgoing batch to a power of two. Since 
> memory is allocated in powers of two, this will help us pack the value 
> vectors densely thereby reducing the amount of memory that gets wasted 
> because of doubling effect.
> So, to summarize, the benefits we will get are improved memory utilization, 
> better performance, higher concurrency and less queries dying because of out 
> of memory errors.
> Note: Since these sizing calculations are based on averages, strict memory 
> usage enforcement is not possible. There could be pathological cases where 
> because of uneven data distribution, we might exceed the configured output 
> batch size potentially causing OOM errors and problems in downstream 
> operators.
> Other issues that will be addressed:
>  * We are adding extra processing for each