[jira] [Updated] (DRILL-6498) Support for EMIT outcome in ExternalSortBatch
[ https://issues.apache.org/jira/browse/DRILL-6498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pritesh Maker updated DRILL-6498: - Labels: ready-to-commit (was: ) > Support for EMIT outcome in ExternalSortBatch > - > > Key: DRILL-6498 > URL: https://issues.apache.org/jira/browse/DRILL-6498 > Project: Apache Drill > Issue Type: Task > Components: Execution - Relational Operators >Reporter: Sorabh Hamirwasia >Assignee: Sorabh Hamirwasia >Priority: Major > Labels: ready-to-commit > Fix For: 1.14.0 > > > With Lateral and Unnest if Sort is present in the sub-query, then it needs to > handle the EMIT outcome correctly. This means when a EMIT is received then > perform the Sort operation on the records buffered so far and produce output > with it. After EMIT Sort should refresh it's state and again work on next > batches of incoming record unless an EMIT is seen again. > For first cut Sort will not support spilling in the subquery between Lateral > and Unnest since spilling is very unlikely. The worst case that can happen is > that Lateral will get a batch with only 1 row of data because of repeated > type column data size being too big. In that case Unnest will produce 1 > output batch only and Sort or other blocking operators anyways needs enough > memory to at least hold 1 incoming batch. So in ideal cases spilling should > not happen. But if there is a operator between Sort and Unnest which > increases the data size then Sort might be in a situation to spill but thats > not a common case for now. > > *Description of Changes:* > Currently the sort operator is implemented in below way. This is to provide > general high level working of Sort and how EMIT support was implemented. > 1) In buildSchema phase SORT creates an empty container with SV NONE and > sends that downstream. > 2) Post buildSchema phase it goes into a LOAD and keeps calling next() on > upstream until it sees NONE or there is a failure. > 3) Each batch which it receives it applies SV2 on them if it already doesn't > have it and sort them and then buffers the batch after converting it into > something called BatchGroup.InputBatch. > 4) During buffering it looks for memory pressure and spill as needed. > 5) Once all the batches are received and it gets None from upstream, it > starts a merge phase. > 6) In Merge phase it check if the merge can happen in memory or spilling is > needed and perform the merge accordingly. > 7) Sort has a concept of SortResults which represents different kinds of > output container that sort can generate based on input batches and memory > conditions. For example if it's an in-memory merge then output container of > sort is SV4 container with SortResults of type MergeSortWrapper. If its spill > and merge then container is of SV_NONE type with SortResults as BatchMerger. > There are SortResults type for empty and single batches (not used anywhere). > 8) SortResults basically provides an abstraction such that it provides output > result with each next call to it backed by output container of the > ExternalSortRecordBatch along with correct recordCount and Sv2/SV4 as needed. > So for example: in case of MergeSortWrapper all the inputs are in memory and > hence all output is also in memory backed by SV4. For each next call > basically SV4 is updated with the start index and length which informs called > about record boundary that it should consume. For BatchMerger based on memory > pressure and number of record Sort can output with each output container, it > fills the output container with that many records and sends downstream. > 9) Also the abstraction of SortResults is such that at beginning of > MergePhase output container which is held by SortResults is cleared off and > later re-initialized after merge is completed. > Now in current condition since SORT is a blocking operator it was clearing > the output container ValueVectors post buildSchema phase and in load phase. > And later it create the final output container (with ValueVector objects > )after it has seen all the incoming data. The very first output batch is > always returned with OK_NEW_SCHEMA such that downstream operator can setup > the correct SV mode and schema with the first output batch, since schema > returned in buildSchema phase was a dummy one. So the vector references > maintained by downstream operator in buildSchema phase is updated with vector > references in the first output batch. > With EMIT however SORT will go into load phase multiple times and hence we > cannot clear off the output container of Sort after each EMIT boundary. If we > do that then downstream operator which is holding references to ExternalSort > output container ValueVector will become
[jira] [Updated] (DRILL-6498) Support for EMIT outcome in ExternalSortBatch
[ https://issues.apache.org/jira/browse/DRILL-6498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sorabh Hamirwasia updated DRILL-6498: - Description: With Lateral and Unnest if Sort is present in the sub-query, then it needs to handle the EMIT outcome correctly. This means when a EMIT is received then perform the Sort operation on the records buffered so far and produce output with it. After EMIT Sort should refresh it's state and again work on next batches of incoming record unless an EMIT is seen again. For first cut Sort will not support spilling in the subquery between Lateral and Unnest since spilling is very unlikely. The worst case that can happen is that Lateral will get a batch with only 1 row of data because of repeated type column data size being too big. In that case Unnest will produce 1 output batch only and Sort or other blocking operators anyways needs enough memory to at least hold 1 incoming batch. So in ideal cases spilling should not happen. But if there is a operator between Sort and Unnest which increases the data size then Sort might be in a situation to spill but thats not a common case for now. *Description of Changes:* Currently the sort operator is implemented in below way. This is to provide general high level working of Sort and how EMIT support was implemented. 1) In buildSchema phase SORT creates an empty container with SV NONE and sends that downstream. 2) Post buildSchema phase it goes into a LOAD and keeps calling next() on upstream until it sees NONE or there is a failure. 3) Each batch which it receives it applies SV2 on them if it already doesn't have it and sort them and then buffers the batch after converting it into something called BatchGroup.InputBatch. 4) During buffering it looks for memory pressure and spill as needed. 5) Once all the batches are received and it gets None from upstream, it starts a merge phase. 6) In Merge phase it check if the merge can happen in memory or spilling is needed and perform the merge accordingly. 7) Sort has a concept of SortResults which represents different kinds of output container that sort can generate based on input batches and memory conditions. For example if it's an in-memory merge then output container of sort is SV4 container with SortResults of type MergeSortWrapper. If its spill and merge then container is of SV_NONE type with SortResults as BatchMerger. There are SortResults type for empty and single batches (not used anywhere). 8) SortResults basically provides an abstraction such that it provides output result with each next call to it backed by output container of the ExternalSortRecordBatch along with correct recordCount and Sv2/SV4 as needed. So for example: in case of MergeSortWrapper all the inputs are in memory and hence all output is also in memory backed by SV4. For each next call basically SV4 is updated with the start index and length which informs called about record boundary that it should consume. For BatchMerger based on memory pressure and number of record Sort can output with each output container, it fills the output container with that many records and sends downstream. 9) Also the abstraction of SortResults is such that at beginning of MergePhase output container which is held by SortResults is cleared off and later re-initialized after merge is completed. Now in current condition since SORT is a blocking operator it was clearing the output container ValueVectors post buildSchema phase and in load phase. And later it create the final output container (with ValueVector objects )after it has seen all the incoming data. The very first output batch is always returned with OK_NEW_SCHEMA such that downstream operator can setup the correct SV mode and schema with the first output batch, since schema returned in buildSchema phase was a dummy one. So the vector references maintained by downstream operator in buildSchema phase is updated with vector references in the first output batch. With EMIT however SORT will go into load phase multiple times and hence we cannot clear off the output container of Sort after each EMIT boundary. If we do that then downstream operator which is holding references to ExternalSort output container ValueVector will become invalid and Sort also has to send OK_NEW_SCHEMA with first output batch of each EMIT boundary. So to avoid that we need to make sure that with each EMIT boundary, sort will go from load to merge phase to produce output. And for each of these phase there is no clearing of output container is happening. *To achieve above and keep the code changes to minimum following method is followed:* 1) A wrapper output container (_outputWrapperContainer_) and wrapper SV4 (_outputSV4_) is created. 2) This outputWrapperContainer is provided to the SortResults instead of the actual container which carries the output of
[jira] [Updated] (DRILL-6498) Support for EMIT outcome in ExternalSortBatch
[ https://issues.apache.org/jira/browse/DRILL-6498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sorabh Hamirwasia updated DRILL-6498: - Description: With Lateral and Unnest if Sort is present in the sub-query, then it needs to handle the EMIT outcome correctly. This means when a EMIT is received then perform the Sort operation on the records buffered so far and produce output with it. After EMIT Sort should refresh it's state and again work on next batches of incoming record unless an EMIT is seen again. For first cut Sort will not support spilling in the subquery between Lateral and Unnest since spilling is very unlikely. The worst case that can happen is that Lateral will get a batch with only 1 row of data because of repeated type column data size being too big. In that case Unnest will produce 1 output batch only and Sort or other blocking operators anyways needs enough memory to at least hold 1 incoming batch. So in ideal cases spilling should not happen. But if there is a operator between Sort and Unnest which increases the data size then Sort might be in a situation to spill but thats not a common case for now. *Description of Changes:* Currently the sort operator is implemented in below way. This is to provide general high level working of Sort and how EMIT support was implemented. 1) In buildSchema phase SORT creates an empty container with SV NONE and sends that downstream. 2) Post buildSchema phase it goes into a LOAD and keeps calling next() on upstream until it sees NONE or there is a failure. 3) Each batch which it receives it applies SV2 on them if it already doesn't have it and sort them and then buffers the batch after converting it into something called BatchGroup.InputBatch. 4) During buffering it looks for memory pressure and spill as needed. 5) Once all the batches are received and it gets None from upstream, it starts a merge phase. 6) In Merge phase it check if the merge can happen in memory or spilling is needed and perform the merge accordingly. 7) Sort has a concept of SortResults which represents different kinds of output container that sort can generate based on input batches and memory conditions. For example if it's an in-memory merge then output container of sort is SV4 container with SortResults of type MergeSortWrapper. If its spill and merge then container is of SV_NONE type with SortResults as BatchMerger. There are SortResults type for empty and single batches (not used anywhere). 8) SortResults basically provides an abstraction such that it provides output result with each next call to it backed by output container of the ExternalSortRecordBatch along with correct recordCount and Sv2/SV4 as needed. So for example: in case of MergeSortWrapper all the inputs are in memory and hence all output is also in memory backed by SV4. For each next call basically SV4 is updated with the start index and length which informs called about record boundary that it should consume. For BatchMerger based on memory pressure and number of record Sort can output with each output container, it fills the output container with that many records and sends downstream. 9) Also the abstraction of SortResults is such that at beginning of MergePhase output container which is held by SortResults is cleared off and later re-initialized after merge is completed. Now in current condition since SORT is a blocking operator it was clearing the output container ValueVectors post buildSchema phase and in load phase. And later it create the final output container (with ValueVector objects )after it has seen all the incoming data. The very first output batch is always returned with OK_NEW_SCHEMA such that downstream operator can setup the correct SV mode and schema with the first output batch, since schema returned in buildSchema phase was a dummy one. So the vector references maintained by downstream operator in buildSchema phase is updated with vector references in the first output batch. With EMIT however SORT will go into load phase multiple times and hence we cannot clear off the output container of Sort after each EMIT boundary. If we do that then downstream operator which is holding references to ExternalSort output container ValueVector will become invalid and Sort also has to send OK_NEW_SCHEMA with first output batch of each EMIT boundary. So to avoid that we need to make sure that with each EMIT boundary, sort will go from load to merge phase to produce output. And for each of these phase there is no clearing of output container is happening. To achieve above and keep the code changes to minimum following method is followed: 1) A wrapper output container (_outputWrapperContainer_) and wrapper SV4 (_outputSV4_) is created. 2) This outputWrapperContainer is provided to the SortResults instead of the actual container which carries the output of
[jira] [Updated] (DRILL-6498) Support for EMIT outcome in ExternalSortBatch
[ https://issues.apache.org/jira/browse/DRILL-6498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sorabh Hamirwasia updated DRILL-6498: - Description: With Lateral and Unnest if Sort is present in the sub-query, then it needs to handle the EMIT outcome correctly. This means when a EMIT is received then perform the Sort operation on the records buffered so far and produce output with it. After EMIT Sort should refresh it's state and again work on next batches of incoming record unless an EMIT is seen again. For first cut Sort will not support spilling in the subquery between Lateral and Unnest since spilling is very unlikely. The worst case that can happen is that Lateral will get a batch with only 1 row of data because of repeated type column data size being too big. In that case Unnest will produce 1 output batch only and Sort or other blocking operators anyways needs enough memory to at least hold 1 incoming batch. So in ideal cases spilling should not happen. But if there is a operator between Sort and Unnest which increases the data size then Sort might be in a situation to spill but thats not a common case for now. *Description of Changes:* Currently the sort operator is implemented in below way. This is to provide general high level working of Sort and how EMIT support was implemented. 1) In buildSchema phase SORT creates an empty container with SV NONE and sends that downstream. 2) Post buildSchema phase it goes into a LOAD and keeps calling next() on upstream until it sees NONE or there is a failure. 3) Each batch which it receives it applies SV2 on them if it already doesn't have it and sort them and then buffers the batch after converting it into something called BatchGroup.InputBatch. 4) During buffering it looks for memory pressure and spill as needed. 5) Once all the batches are received and it gets None from upstream, it starts a merge phase. 6) In Merge phase it check if the merge can happen in memory or spilling is needed and perform the merge accordingly. 7) Sort has a concept of SortResults which represents different kinds of output container that sort can generate based on input batches and memory conditions. For example if it's an in-memory merge then output container of sort is SV4 container with SortResults of type MergeSortWrapper. If its spill and merge then container is of SV_NONE type with SortResults as BatchMerger. There are SortResults type for empty and single batches (not used anywhere). 8) SortResults basically provides an abstraction such that it provides output result with each next call to it backed by output container of the ExternalSortRecordBatch along with correct recordCount and Sv2/SV4 as needed. So for example: in case of MergeSortWrapper all the inputs are in memory and hence all output is also in memory backed by SV4. For each next call basically SV4 is updated with the start index and length which informs called about record boundary that it should consume. For BatchMerger based on memory pressure and number of record Sort can output with each output container, it fills the output container with that many records and sends downstream. 9) Also the abstraction of SortResults is such that at beginning of MergePhase output container which is held by SortResults is cleared off and later re-initialized after merge is completed. Now in current condition since SORT is a blocking operator it was clearing the output container ValueVectors post buildSchema phase and in load phase. And later it create the final output container (with ValueVector objects )after it has seen all the incoming data. The very first output batch is always returned with OK_NEW_SCHEMA such that downstream operator can setup the correct SV mode and schema with the first output batch, since schema returned in buildSchema phase was a dummy one. So the vector references maintained by downstream operator in buildSchema phase is updated with vector references in the first output batch. With EMIT however SORT will go into load phase multiple times and hence we cannot clear off the output container of Sort after each EMIT boundary. If we do that then downstream operator which is holding references to ExternalSort output container ValueVector will become invalid and Sort also has to send OK_NEW_SCHEMA with first output batch of each EMIT boundary. So to avoid that we need to make sure that with each EMIT boundary, sort will go from load to merge phase to produce output. And for each of these phase there is no clearing of output container is happening. *To achieve above and keep the code changes to minimum following method is followed:* 1) A wrapper output container (_outputWrapperContainer_) and wrapper SV4 (_outputSV4_) is created. 2) This outputWrapperContainer is provided to the SortResults instead of the actual container which carries the output of
[jira] [Updated] (DRILL-6498) Support for EMIT outcome in ExternalSortBatch
[ https://issues.apache.org/jira/browse/DRILL-6498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pritesh Maker updated DRILL-6498: - Reviewer: Parth Chandra > Support for EMIT outcome in ExternalSortBatch > - > > Key: DRILL-6498 > URL: https://issues.apache.org/jira/browse/DRILL-6498 > Project: Apache Drill > Issue Type: Task > Components: Execution - Relational Operators >Reporter: Sorabh Hamirwasia >Assignee: Sorabh Hamirwasia >Priority: Major > Fix For: 1.14.0 > > > With Lateral and Unnest if Sort is present in the sub-query, then it needs to > handle the EMIT outcome correctly. This means when a EMIT is received then > perform the Sort operation on the records buffered so far and produce output > with it. After EMIT Sort should refresh it's state and again work on next > batches of incoming record unless an EMIT is seen again. > For first cut Sort will not support spilling in the subquery between Lateral > and Unnest since spilling is very unlikely. The worst case that can happen is > that Lateral will get a batch with only 1 row of data because of repeated > type column data size being too big. In that case Unnest will produce 1 > output batch only and Sort or other blocking operators anyways needs enough > memory to at least hold 1 incoming batch. So in ideal cases spilling should > not happen. But if there is a operator between Sort and Unnest which > increases the data size then Sort might be in a situation to spill but thats > not a common case for now. -- This message was sent by Atlassian JIRA (v7.6.3#76005)