[jira] [Updated] (DRILL-6498) Support for EMIT outcome in ExternalSortBatch

2018-06-27 Thread Pritesh Maker (JIRA)


 [ 
https://issues.apache.org/jira/browse/DRILL-6498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pritesh Maker updated DRILL-6498:
-
Labels: ready-to-commit  (was: )

> Support for EMIT outcome in ExternalSortBatch
> -
>
> Key: DRILL-6498
> URL: https://issues.apache.org/jira/browse/DRILL-6498
> Project: Apache Drill
>  Issue Type: Task
>  Components: Execution - Relational Operators
>Reporter: Sorabh Hamirwasia
>Assignee: Sorabh Hamirwasia
>Priority: Major
>  Labels: ready-to-commit
> Fix For: 1.14.0
>
>
> With Lateral and Unnest if Sort is present in the sub-query, then it needs to 
> handle the EMIT outcome correctly. This means when a EMIT is received then 
> perform the Sort operation on the records buffered so far and produce output 
> with it. After EMIT Sort should refresh it's state and again work on next 
> batches of incoming record unless an EMIT is seen again.
> For first cut Sort will not support spilling in the subquery between Lateral 
> and Unnest since spilling is very unlikely. The worst case that can happen is 
> that Lateral will get a batch with only 1 row of data because of repeated 
> type column data size being too big. In that case Unnest will produce 1 
> output batch only and Sort or other blocking operators anyways needs enough 
> memory to at least hold 1 incoming batch. So in ideal cases spilling should 
> not happen. But if there is a operator between Sort and Unnest which 
> increases the data size then Sort might be in a situation to spill but thats 
> not a common case for now.
>  
> *Description of Changes:*
>  Currently the sort operator is implemented in below way. This is to provide 
> general high level working of Sort and how EMIT support was implemented.
> 1) In buildSchema phase SORT creates an empty container with SV NONE and 
> sends that downstream.
> 2) Post buildSchema phase it goes into a LOAD and keeps calling next() on 
> upstream until it sees NONE or there is a failure.
> 3) Each batch which it receives it applies SV2 on them if it already doesn't 
> have it and sort them and then buffers the batch after converting it into 
> something called BatchGroup.InputBatch.
> 4) During buffering it looks for memory pressure and spill as needed.
> 5) Once all the batches are received and it gets None from upstream, it 
> starts a merge phase.
> 6) In Merge phase it check if the merge can happen in memory or spilling is 
> needed and perform the merge accordingly. 
> 7) Sort has a concept of SortResults which represents different kinds of 
> output container that sort can generate based on input batches and memory 
> conditions. For example if it's an in-memory merge then output container of 
> sort is SV4 container with SortResults of type MergeSortWrapper. If its spill 
> and merge then container is of SV_NONE type with SortResults as BatchMerger. 
> There are SortResults type for empty and single batches (not used anywhere).
> 8) SortResults basically provides an abstraction such that it provides output 
> result with each next call to it backed by output container of the 
> ExternalSortRecordBatch along with correct recordCount and Sv2/SV4 as needed. 
> So for example: in case of MergeSortWrapper all the inputs are in memory and 
> hence all output is also in memory backed by SV4. For each next call 
> basically SV4 is updated with the start index and length which informs called 
> about record boundary that it should consume. For BatchMerger based on memory 
> pressure and number of record Sort can output with each output container, it 
> fills the output container with that many records and sends downstream.
> 9) Also the abstraction of SortResults is such that at beginning of 
> MergePhase output container which is held by SortResults is cleared off and 
> later re-initialized after merge is completed.
> Now in current condition since SORT is a blocking operator it was clearing 
> the output container ValueVectors post buildSchema phase and in load phase. 
> And later it create the final output container (with ValueVector objects 
> )after it has seen all the incoming data. The very first output batch is 
> always returned with OK_NEW_SCHEMA such that downstream operator can setup 
> the correct SV mode and schema with the first output batch, since schema 
> returned in buildSchema phase was a dummy one. So the vector references 
> maintained by downstream operator in buildSchema phase is updated with vector 
> references in the first output batch.
> With EMIT however SORT will go into load phase multiple times and hence we 
> cannot clear off the output container of Sort after each EMIT boundary. If we 
> do that then downstream operator which is holding references to ExternalSort 
> output container ValueVector will become 

[jira] [Updated] (DRILL-6498) Support for EMIT outcome in ExternalSortBatch

2018-06-21 Thread Sorabh Hamirwasia (JIRA)


 [ 
https://issues.apache.org/jira/browse/DRILL-6498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sorabh Hamirwasia updated DRILL-6498:
-
Description: 
With Lateral and Unnest if Sort is present in the sub-query, then it needs to 
handle the EMIT outcome correctly. This means when a EMIT is received then 
perform the Sort operation on the records buffered so far and produce output 
with it. After EMIT Sort should refresh it's state and again work on next 
batches of incoming record unless an EMIT is seen again.

For first cut Sort will not support spilling in the subquery between Lateral 
and Unnest since spilling is very unlikely. The worst case that can happen is 
that Lateral will get a batch with only 1 row of data because of repeated type 
column data size being too big. In that case Unnest will produce 1 output batch 
only and Sort or other blocking operators anyways needs enough memory to at 
least hold 1 incoming batch. So in ideal cases spilling should not happen. But 
if there is a operator between Sort and Unnest which increases the data size 
then Sort might be in a situation to spill but thats not a common case for now.

 

*Description of Changes:*
 Currently the sort operator is implemented in below way. This is to provide 
general high level working of Sort and how EMIT support was implemented.

1) In buildSchema phase SORT creates an empty container with SV NONE and sends 
that downstream.

2) Post buildSchema phase it goes into a LOAD and keeps calling next() on 
upstream until it sees NONE or there is a failure.

3) Each batch which it receives it applies SV2 on them if it already doesn't 
have it and sort them and then buffers the batch after converting it into 
something called BatchGroup.InputBatch.

4) During buffering it looks for memory pressure and spill as needed.

5) Once all the batches are received and it gets None from upstream, it starts 
a merge phase.

6) In Merge phase it check if the merge can happen in memory or spilling is 
needed and perform the merge accordingly. 

7) Sort has a concept of SortResults which represents different kinds of output 
container that sort can generate based on input batches and memory conditions. 
For example if it's an in-memory merge then output container of sort is SV4 
container with SortResults of type MergeSortWrapper. If its spill and merge 
then container is of SV_NONE type with SortResults as BatchMerger. There are 
SortResults type for empty and single batches (not used anywhere).

8) SortResults basically provides an abstraction such that it provides output 
result with each next call to it backed by output container of the 
ExternalSortRecordBatch along with correct recordCount and Sv2/SV4 as needed. 
So for example: in case of MergeSortWrapper all the inputs are in memory and 
hence all output is also in memory backed by SV4. For each next call basically 
SV4 is updated with the start index and length which informs called about 
record boundary that it should consume. For BatchMerger based on memory 
pressure and number of record Sort can output with each output container, it 
fills the output container with that many records and sends downstream.

9) Also the abstraction of SortResults is such that at beginning of MergePhase 
output container which is held by SortResults is cleared off and later 
re-initialized after merge is completed.

Now in current condition since SORT is a blocking operator it was clearing the 
output container ValueVectors post buildSchema phase and in load phase. And 
later it create the final output container (with ValueVector objects )after it 
has seen all the incoming data. The very first output batch is always returned 
with OK_NEW_SCHEMA such that downstream operator can setup the correct SV mode 
and schema with the first output batch, since schema returned in buildSchema 
phase was a dummy one. So the vector references maintained by downstream 
operator in buildSchema phase is updated with vector references in the first 
output batch.

With EMIT however SORT will go into load phase multiple times and hence we 
cannot clear off the output container of Sort after each EMIT boundary. If we 
do that then downstream operator which is holding references to ExternalSort 
output container ValueVector will become invalid and Sort also has to send 
OK_NEW_SCHEMA with first output batch of each EMIT boundary. So to avoid that 
we need to make sure that with each EMIT boundary, sort will go from load to 
merge phase to produce output. And for each of these phase there is no clearing 
of output container is happening.

*To achieve above and keep the code changes to minimum following method is 
followed:*

1)  A wrapper output container (_outputWrapperContainer_) and wrapper SV4 
(_outputSV4_) is created.

2) This outputWrapperContainer is provided to the SortResults instead of the 
actual container which carries the output of 

[jira] [Updated] (DRILL-6498) Support for EMIT outcome in ExternalSortBatch

2018-06-21 Thread Sorabh Hamirwasia (JIRA)


 [ 
https://issues.apache.org/jira/browse/DRILL-6498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sorabh Hamirwasia updated DRILL-6498:
-
Description: 
With Lateral and Unnest if Sort is present in the sub-query, then it needs to 
handle the EMIT outcome correctly. This means when a EMIT is received then 
perform the Sort operation on the records buffered so far and produce output 
with it. After EMIT Sort should refresh it's state and again work on next 
batches of incoming record unless an EMIT is seen again.

For first cut Sort will not support spilling in the subquery between Lateral 
and Unnest since spilling is very unlikely. The worst case that can happen is 
that Lateral will get a batch with only 1 row of data because of repeated type 
column data size being too big. In that case Unnest will produce 1 output batch 
only and Sort or other blocking operators anyways needs enough memory to at 
least hold 1 incoming batch. So in ideal cases spilling should not happen. But 
if there is a operator between Sort and Unnest which increases the data size 
then Sort might be in a situation to spill but thats not a common case for now.

 

*Description of Changes:*
Currently the sort operator is implemented in below way. This is to provide 
general high level working of Sort and how EMIT support was implemented.


1) In buildSchema phase SORT creates an empty container with SV NONE and sends 
that downstream.

2) Post buildSchema phase it goes into a LOAD and keeps calling next() on 
upstream until it sees NONE or there is a failure.

3) Each batch which it receives it applies SV2 on them if it already doesn't 
have it and sort them and then buffers the batch after converting it into 
something called BatchGroup.InputBatch.

4) During buffering it looks for memory pressure and spill as needed.

5) Once all the batches are received and it gets None from upstream, it starts 
a merge phase.

6) In Merge phase it check if the merge can happen in memory or spilling is 
needed and perform the merge accordingly. 

7) Sort has a concept of SortResults which represents different kinds of output 
container that sort can generate based on input batches and memory conditions. 
For example if it's an in-memory merge then output container of sort is SV4 
container with SortResults of type MergeSortWrapper. If its spill and merge 
then container is of SV_NONE type with SortResults as BatchMerger. There are 
SortResults type for empty and single batches (not used anywhere).

8) SortResults basically provides an abstraction such that it provides output 
result with each next call to it backed by output container of the 
ExternalSortRecordBatch along with correct recordCount and Sv2/SV4 as needed. 
So for example: in case of MergeSortWrapper all the inputs are in memory and 
hence all output is also in memory backed by SV4. For each next call basically 
SV4 is updated with the start index and length which informs called about 
record boundary that it should consume. For BatchMerger based on memory 
pressure and number of record Sort can output with each output container, it 
fills the output container with that many records and sends downstream.

9) Also the abstraction of SortResults is such that at beginning of MergePhase 
output container which is held by SortResults is cleared off and later 
re-initialized after merge is completed.

Now in current condition since SORT is a blocking operator it was clearing the 
output container ValueVectors post buildSchema phase and in load phase. And 
later it create the final output container (with ValueVector objects )after it 
has seen all the incoming data. The very first output batch is always returned 
with OK_NEW_SCHEMA such that downstream operator can setup the correct SV mode 
and schema with the first output batch, since schema returned in buildSchema 
phase was a dummy one. So the vector references maintained by downstream 
operator in buildSchema phase is updated with vector references in the first 
output batch.

With EMIT however SORT will go into load phase multiple times and hence we 
cannot clear off the output container of Sort after each EMIT boundary. If we 
do that then downstream operator which is holding references to ExternalSort 
output container ValueVector will become invalid and Sort also has to send 
OK_NEW_SCHEMA with first output batch of each EMIT boundary. So to avoid that 
we need to make sure that with each EMIT boundary, sort will go from load to 
merge phase to produce output. And for each of these phase there is no clearing 
of output container is happening.

To achieve above and keep the code changes to minimum following method is 
followed:

1)  A wrapper output container (_outputWrapperContainer_) and wrapper SV4 
(_outputSV4_) is created.

2) This outputWrapperContainer is provided to the SortResults instead of the 
actual container which carries the output of 

[jira] [Updated] (DRILL-6498) Support for EMIT outcome in ExternalSortBatch

2018-06-21 Thread Sorabh Hamirwasia (JIRA)


 [ 
https://issues.apache.org/jira/browse/DRILL-6498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sorabh Hamirwasia updated DRILL-6498:
-
Description: 
With Lateral and Unnest if Sort is present in the sub-query, then it needs to 
handle the EMIT outcome correctly. This means when a EMIT is received then 
perform the Sort operation on the records buffered so far and produce output 
with it. After EMIT Sort should refresh it's state and again work on next 
batches of incoming record unless an EMIT is seen again.

For first cut Sort will not support spilling in the subquery between Lateral 
and Unnest since spilling is very unlikely. The worst case that can happen is 
that Lateral will get a batch with only 1 row of data because of repeated type 
column data size being too big. In that case Unnest will produce 1 output batch 
only and Sort or other blocking operators anyways needs enough memory to at 
least hold 1 incoming batch. So in ideal cases spilling should not happen. But 
if there is a operator between Sort and Unnest which increases the data size 
then Sort might be in a situation to spill but thats not a common case for now.

 

*Description of Changes:*
 Currently the sort operator is implemented in below way. This is to provide 
general high level working of Sort and how EMIT support was implemented.

1) In buildSchema phase SORT creates an empty container with SV NONE and sends 
that downstream.

2) Post buildSchema phase it goes into a LOAD and keeps calling next() on 
upstream until it sees NONE or there is a failure.

3) Each batch which it receives it applies SV2 on them if it already doesn't 
have it and sort them and then buffers the batch after converting it into 
something called BatchGroup.InputBatch.

4) During buffering it looks for memory pressure and spill as needed.

5) Once all the batches are received and it gets None from upstream, it starts 
a merge phase.

6) In Merge phase it check if the merge can happen in memory or spilling is 
needed and perform the merge accordingly. 

7) Sort has a concept of SortResults which represents different kinds of output 
container that sort can generate based on input batches and memory conditions. 
For example if it's an in-memory merge then output container of sort is SV4 
container with SortResults of type MergeSortWrapper. If its spill and merge 
then container is of SV_NONE type with SortResults as BatchMerger. There are 
SortResults type for empty and single batches (not used anywhere).

8) SortResults basically provides an abstraction such that it provides output 
result with each next call to it backed by output container of the 
ExternalSortRecordBatch along with correct recordCount and Sv2/SV4 as needed. 
So for example: in case of MergeSortWrapper all the inputs are in memory and 
hence all output is also in memory backed by SV4. For each next call basically 
SV4 is updated with the start index and length which informs called about 
record boundary that it should consume. For BatchMerger based on memory 
pressure and number of record Sort can output with each output container, it 
fills the output container with that many records and sends downstream.

9) Also the abstraction of SortResults is such that at beginning of MergePhase 
output container which is held by SortResults is cleared off and later 
re-initialized after merge is completed.

Now in current condition since SORT is a blocking operator it was clearing the 
output container ValueVectors post buildSchema phase and in load phase. And 
later it create the final output container (with ValueVector objects )after it 
has seen all the incoming data. The very first output batch is always returned 
with OK_NEW_SCHEMA such that downstream operator can setup the correct SV mode 
and schema with the first output batch, since schema returned in buildSchema 
phase was a dummy one. So the vector references maintained by downstream 
operator in buildSchema phase is updated with vector references in the first 
output batch.

With EMIT however SORT will go into load phase multiple times and hence we 
cannot clear off the output container of Sort after each EMIT boundary. If we 
do that then downstream operator which is holding references to ExternalSort 
output container ValueVector will become invalid and Sort also has to send 
OK_NEW_SCHEMA with first output batch of each EMIT boundary. So to avoid that 
we need to make sure that with each EMIT boundary, sort will go from load to 
merge phase to produce output. And for each of these phase there is no clearing 
of output container is happening.

*To achieve above and keep the code changes to minimum following method is 
followed:*

1)  A wrapper output container (_outputWrapperContainer_) and wrapper SV4 
(_outputSV4_) is created.

2) This outputWrapperContainer is provided to the SortResults instead of the 
actual container which carries the output of 

[jira] [Updated] (DRILL-6498) Support for EMIT outcome in ExternalSortBatch

2018-06-15 Thread Pritesh Maker (JIRA)


 [ 
https://issues.apache.org/jira/browse/DRILL-6498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pritesh Maker updated DRILL-6498:
-
Reviewer: Parth Chandra

> Support for EMIT outcome in ExternalSortBatch
> -
>
> Key: DRILL-6498
> URL: https://issues.apache.org/jira/browse/DRILL-6498
> Project: Apache Drill
>  Issue Type: Task
>  Components: Execution - Relational Operators
>Reporter: Sorabh Hamirwasia
>Assignee: Sorabh Hamirwasia
>Priority: Major
> Fix For: 1.14.0
>
>
> With Lateral and Unnest if Sort is present in the sub-query, then it needs to 
> handle the EMIT outcome correctly. This means when a EMIT is received then 
> perform the Sort operation on the records buffered so far and produce output 
> with it. After EMIT Sort should refresh it's state and again work on next 
> batches of incoming record unless an EMIT is seen again.
> For first cut Sort will not support spilling in the subquery between Lateral 
> and Unnest since spilling is very unlikely. The worst case that can happen is 
> that Lateral will get a batch with only 1 row of data because of repeated 
> type column data size being too big. In that case Unnest will produce 1 
> output batch only and Sort or other blocking operators anyways needs enough 
> memory to at least hold 1 incoming batch. So in ideal cases spilling should 
> not happen. But if there is a operator between Sort and Unnest which 
> increases the data size then Sort might be in a situation to spill but thats 
> not a common case for now.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)