Re: Batch Sizing for Parquet Flat Reader

2018-02-11 Thread Padma Penumarthy
With average row size method, since I know number of rows and the average size 
for each column, 
I am planning to use that information to allocate required memory for each 
vector upfront. 
This should help avoid copying every time we double and also improve memory 
utilization.

Thanks
Padma


> On Feb 11, 2018, at 3:44 PM, Paul Rogers  wrote:
> 
> One more thought:
>>> 3) Assuming that you go with the average batch size calculation approach,
> 
> The average batch size approach is a quick and dirty approach for non-leaf 
> operators that can observe an incoming batch to estimate row width. Because 
> Drill batches are large, the law of large numbers means that the average of a 
> large input batch is likely to be a good estimator for the average size of a 
> large output batch.
> Note that this works only because non-leaf operators have an input batch to 
> sample. Leaf operators (readers) do not have this luxury. Hence the result 
> set loader uses the actual accumulated size for the current batch.
> Also note that the average row method, while handy, is not optimal. It will, 
> in general, result in greater internal fragmentation than the result set 
> loader. Why? The result set loader packs vectors right up to the point where 
> the largest would overflow. The average row method works at the aggregate 
> level and will likely result in wasted space (internal fragmentation) in the 
> largest vector. Said another way, with the average row size method, we can 
> usually pack in a few more rows before the batch actually fills, and so we 
> end up with batches with lower "density" than the optimal. This is important 
> when the consuming operator is a buffering one such as sort.
> The key reason Padma is using the quick & dirty average row size method is 
> not that it is ideal (it is not), but rather that it is, in fact, quick.
> We do want to move to the result set loader over time so we get improved 
> memory utilization. And, it is the only way to control row size in readers 
> such as CSV or JSON in which we have no size information until we read the 
> data.
> - Paul   



Re: Batch Sizing for Parquet Flat Reader

2018-02-11 Thread Paul Rogers
Hi Salim.
Thanks much for the detailed explanation! You clearly have developed a deep 
understanding of the Parquet code and its impact on CPU and I/O performance. My 
comments are more from the holistic perspective as Drill as a whole.
Far too much to discuss on the dev list. I've added your comments, and my 
response, to DRILL-6147.
The key question is: what is our end goal and what path gets us there with the 
least effort? Perhaps those design doc updates Parth requested could spell that 
out a bit more.
Thanks,
- Paul


On Sunday, February 11, 2018, 2:36:14 PM PST, salim achouche 
 wrote:  
 
 Thanks Paul for your feedback! let me try to answer some of your questions / 
comments:

Duplicate Implementation
- I am not contemplating two different implementations; one for Parquet and 
another for the rest of the code
- Instead, I am reacting to the fact that we have two different processing 
patterns Row Oriented and Columnar
- The goal is to offer both strategies depending on the operator

Complex Vs Flat Parquet Readers
- The Complex and Flat Parquet readers are quite different
- I presume, for the sake of performance, we can enhance our SQL capabilities 
so that the Flat Parquet reader can be invoked more frequently

Predicate Pushdown
- The reason I invoked Predicate Pushdown within the document is to help the 
analysis:
  o Notice how Record Batch materialization could involve many more pages
  o A solution that relies mainly on the current set of pages (one per column) 
might pay a heavy IO price without much to show for
      + By waiting for all columns to have at least one page loaded so that 
upfront stats are gathered 
      + Batch memory is then divided optimally across columns and the current 
batch size is computed
      + Unfortunately, such logic will fail if more pages are involved than the 
ones taken in consideration
  o Example -
      + Two variable length columns c1 and c2
      + Reader waits for two pages P1-1 and P2-1 so that we a) allocate memory 
optimally across c1 and c2 and b) compute a batch size that will minimize 
overflow logic
      + Assume, because of data length skew or predicate pushdown, that more 
pages are involved in loading the batch
      + for c1: {P1-1, P1-2, P1-3, P1-4}, c2: {P2-1, P2-2} 
      + It is now highly possible that overflow logic might not be optimal 
since only  two pages statistics were considered instead of six

 - I have added new logic to the ScanBatch so to log (on-demand) extra batch 
statistics which will help us assess the efficiency of the batch sizing 
strategy; will add this information to the document when this sub-task is done


Implementation Strategy
- DRILL-6147 mission is to implement batch sizing for Flat Parquet with minimal 
overhead
- This will also help us test this functionality for end-to-end cases (whole 
query)
- My next task (after DRILL-6147) is to incorporate your framework with Parquet 
- I’ll will a) enhance the framework to support columnar processing and b) 
refactor the Parquet code to user the framework
- I agree there might be some duplicate effort but I really believe this will 
be minimal
- DRILL-6147 is not more than one week of research & analysis and one week of 
implementation

Regards,
Salim



> On Feb 11, 2018, at 1:35 PM, Paul Rogers  wrote:
> 
> Hi All,
> Perhaps this topic needs just a bit more thought and discussion to avoid 
> working at cross purposes. I've outlined the issues, and a possible path 
> forward, in a comment to DRILL-6147.
> Quick summary: creating a second batch size implementation just for Parquet 
> will be very difficult once we handle all the required use cases as spelled 
> out in the comment. We'd want to be very sure that we do, indeed, want to 
> duplicate this effort before we head down that route. Duplicating the effort 
> means repeating all the work done over the last six months to make the 
> original result set loader work, and the future work needed to maintain two 
> parallel systems. This is not a decision to make by default.
> Thanks,
> - Paul
> 
>    On Sunday, February 11, 2018, 12:10:58 AM PST, Parth Chandra 
> wrote:  
> 
> Thanks Salim.
> Can you add this to the JIRA/design doc. Also, I would venture to suggest
> that the section on predicate pushdown can be made clearer.
> Also, Since you're proposing the average batch size approach with overflow
> handling, some detail on the proposed changes to the framework would be
> useful in the design doc. (Perhaps pseudo code and affected classes.)
> Essentially some guarantees provided by the framework will change and this
> may affect (or not) the existing usage. These should be enumerated in the
> design doc.
> 
> 
  

Re: Batch Sizing for Parquet Flat Reader

2018-02-11 Thread Paul Rogers
Parth notes:

Also note that memory allocations by Netty greater than the 16MB chunk sizeare 
returned to the OS when the memory is free'd. Both this document andthe 
original document on memory fragmentation state incorrectly that suchmemory is 
not released back to the OS. A quick thought experiment - wheredoes this memory 
go if it is not released back to the OS?

This is true. If the original docs said otherwise, then it is an error for 
which I apologize. If this were not true, we'd have lots of memory leaks, which 
we'd have found and fixed. So, clearly memory is returned.
It is not returning memory to the OS that is the issue. Rather, it is the 
fragmentation that occurs when most memory is on the Netty free list and we 
want to get a large chunk from the OS. We can run out of memory even when lots 
is free (in Netty).
The original jemalloc paper talks about an algorithm to return unused memory to 
the OS, perhaps we can add that to our own Netty-based allocator.
We'd want to be clever, however, because allocations from the OS are 1000 times 
slower than allocations from the Netty free list, or at least that was try in a 
prototype I did a year ago on the Mac.
Further, in the general case, even Netty is not a panacea. Even if we keep 
blocks to 16 MB and smaller, doing random sized allocations in random order 
will cause Netty fragmentation: we might want a 16 MB block, half of memory 
might be free, but due to historical alloc/free patterns, all memory is free as 
8 GB blocks and so allocation fails.
Java avoids this issue because it does compaction of free heap space. I'd guess 
we don't really want to try to implement that for direct memory.
This is why DBs generally use fixed-size allocations: it completely avoids 
memory fragmentation issues. One of the goals of the recent "result set loader" 
work is to encapsulate all vector accesses in a higher-level abstraction so 
that, eventually, we can try alternative memory layouts with minimal impact on 
the rest of Drill code. (The column reader and writer layer isolates code from 
actual vector APIs and memory layout.)
Thanks,
- Paul  

Re: Batch Sizing for Parquet Flat Reader

2018-02-11 Thread salim achouche
Paul,

I cannot thank you enough for your help and guidance! You are right that 
columnar readers will have a harder time balancing resource requirements and 
performance. Nevertheless, DRILL-6147 is a starting point; it should allow us 
to gain knowledge and accordingly refine our strategy as we go.

FYI - On a completely different topic; I was working on an EBF regarding the 
Parquet complex reader (though the bug was midstream). I was surprised by the 
level of overhead associated with nested data processing; literarily, the code 
was jumping from one column/level to another just to process a single value. 
There was a comment to perform such processing in a bulk manner (which I agree 
with). The moral of the story is that Drill is dealing with complex use-cases 
that haven’t been dealt with before (at least not with great success); as can 
be seen, we started with simpler solutions only to realize they are 
inefficient. What is needed, is to spend time understanding such use-cases and 
incrementally attempt perfecting those shortcomings.

Regards,
Salim


> On Feb 11, 2018, at 3:44 PM, Paul Rogers  
> om.INVALID> wrote:
> 
> One more thought:
>>> 3) Assuming that you go with the average batch size calculation approach,
> 
> The average batch size approach is a quick and dirty approach for non-leaf 
> operators that can observe an incoming batch to estimate row width. Because 
> Drill batches are large, the law of large numbers means that the average of a 
> large input batch is likely to be a good estimator for the average size of a 
> large output batch.
> Note that this works only because non-leaf operators have an input batch to 
> sample. Leaf operators (readers) do not have this luxury. Hence the result 
> set loader uses the actual accumulated size for the current batch.
> Also note that the average row method, while handy, is not optimal. It will, 
> in general, result in greater internal fragmentation than the result set 
> loader. Why? The result set loader packs vectors right up to the point where 
> the largest would overflow. The average row method works at the aggregate 
> level and will likely result in wasted space (internal fragmentation) in the 
> largest vector. Said another way, with the average row size method, we can 
> usually pack in a few more rows before the batch actually fills, and so we 
> end up with batches with lower "density" than the optimal. This is important 
> when the consuming operator is a buffering one such as sort.
> The key reason Padma is using the quick & dirty average row size method is 
> not that it is ideal (it is not), but rather that it is, in fact, quick.
> We do want to move to the result set loader over time so we get improved 
> memory utilization. And, it is the only way to control row size in readers 
> such as CSV or JSON in which we have no size information until we read the 
> data.
> - Paul   



Re: Batch Sizing for Parquet Flat Reader

2018-02-11 Thread Paul Rogers
One more thought:
> > 3) Assuming that you go with the average batch size calculation approach,

The average batch size approach is a quick and dirty approach for non-leaf 
operators that can observe an incoming batch to estimate row width. Because 
Drill batches are large, the law of large numbers means that the average of a 
large input batch is likely to be a good estimator for the average size of a 
large output batch.
Note that this works only because non-leaf operators have an input batch to 
sample. Leaf operators (readers) do not have this luxury. Hence the result set 
loader uses the actual accumulated size for the current batch.
Also note that the average row method, while handy, is not optimal. It will, in 
general, result in greater internal fragmentation than the result set loader. 
Why? The result set loader packs vectors right up to the point where the 
largest would overflow. The average row method works at the aggregate level and 
will likely result in wasted space (internal fragmentation) in the largest 
vector. Said another way, with the average row size method, we can usually pack 
in a few more rows before the batch actually fills, and so we end up with 
batches with lower "density" than the optimal. This is important when the 
consuming operator is a buffering one such as sort.
The key reason Padma is using the quick & dirty average row size method is not 
that it is ideal (it is not), but rather that it is, in fact, quick.
We do want to move to the result set loader over time so we get improved memory 
utilization. And, it is the only way to control row size in readers such as CSV 
or JSON in which we have no size information until we read the data.
- Paul   

Re: Batch Sizing for Parquet Flat Reader

2018-02-11 Thread salim achouche
Thanks Parth for your feedback! I am planning to enhance the document based
on the received feedback and the prototype I am currently working on!

Regards,
Salim

On Sun, Feb 11, 2018 at 2:36 PM, salim achouche 
wrote:

> Thanks Paul for your feedback! let me try to answer some of your questions
> / comments:
>
> *Duplicate Implementation*
> - I am *not* contemplating two different implementations; one for Parquet
> and another for the rest of the code
> - Instead, I am reacting to the fact that we have two different processing
> patterns Row Oriented and Columnar
> - The goal is to offer both strategies depending on the operator
>
> Complex Vs Flat Parquet Readers
> - The Complex and Flat Parquet readers are quite different
> - I presume, for the sake of performance, we can enhance our SQL
> capabilities so that the Flat Parquet reader can be invoked more frequently
>
> *Predicate Pushdown*
> - The reason I invoked Predicate Pushdown within the document is to help
> the analysis:
>o Notice how Record Batch materialization could involve many more pages
>o A solution that relies mainly on the current set of pages (one per
> column) might pay a heavy IO price without much to show for
>   + By waiting for all columns to have at least one page loaded so
> that upfront stats are gathered
>   + Batch memory is then divided optimally across columns and the
> current batch size is computed
>   + Unfortunately, such logic will fail if more pages are involved
> than the ones taken in consideration
>o Example -
>   + Two variable length columns c1 and c2
>   + Reader waits for two pages P1-1 and P2-1 so that we a) allocate
> memory optimally across c1 and c2 and b) compute a batch size that will
> minimize overflow logic
>   + Assume, because of data length skew or predicate pushdown, that
> more pages are involved in loading the batch
>   + for c1: {P1-1, P1-2, P1-3, P1-4}, c2: {P2-1, P2-2}
>   + It is now highly possible that overflow logic might not be optimal
> since only  two pages statistics were considered instead of six
>
>  - I have added new logic to the ScanBatch so to log (on-demand) extra
> batch statistics which will help us assess the efficiency of the batch
> sizing strategy; will add this information to the document when this
> sub-task is done
>
>
> *Implementation Strategy*
> - DRILL-6147 mission is to implement batch sizing for Flat Parquet with 
> *minimal
> overhead*
> - This will also help us test this functionality for end-to-end cases
> (whole query)
> - My next task (after DRILL-6147) is to incorporate your framework with
> Parquet
> - I’ll will a) enhance the framework to support columnar processing and b)
> refactor the Parquet code to user the framework
> *- *I agree there might be some duplicate effort but I really believe
> this will be minimal
> - DRILL-6147 is not more than one week of research & analysis and one week
> of implementation
>
> Regards,
> Salim
>
>
>
> On Feb 11, 2018, at 1:35 PM, Paul Rogers 
> wrote:
>
> Hi All,
> Perhaps this topic needs just a bit more thought and discussion to avoid
> working at cross purposes. I've outlined the issues, and a possible path
> forward, in a comment to DRILL-6147.
> Quick summary: creating a second batch size implementation just for
> Parquet will be very difficult once we handle all the required use cases as
> spelled out in the comment. We'd want to be very sure that we do, indeed,
> want to duplicate this effort before we head down that route. Duplicating
> the effort means repeating all the work done over the last six months to
> make the original result set loader work, and the future work needed to
> maintain two parallel systems. This is not a decision to make by default.
> Thanks,
> - Paul
>
>On Sunday, February 11, 2018, 12:10:58 AM PST, Parth Chandra <
> par...@apache.org> wrote:
>
> Thanks Salim.
> Can you add this to the JIRA/design doc. Also, I would venture to suggest
> that the section on predicate pushdown can be made clearer.
> Also, Since you're proposing the average batch size approach with overflow
> handling, some detail on the proposed changes to the framework would be
> useful in the design doc. (Perhaps pseudo code and affected classes.)
> Essentially some guarantees provided by the framework will change and this
> may affect (or not) the existing usage. These should be enumerated in the
> design doc.
>
>
>
>


Re: Batch Sizing for Parquet Flat Reader

2018-02-11 Thread salim achouche
Thanks Paul for your feedback! let me try to answer some of your questions / 
comments:

Duplicate Implementation
- I am not contemplating two different implementations; one for Parquet and 
another for the rest of the code
- Instead, I am reacting to the fact that we have two different processing 
patterns Row Oriented and Columnar
- The goal is to offer both strategies depending on the operator

Complex Vs Flat Parquet Readers
- The Complex and Flat Parquet readers are quite different
- I presume, for the sake of performance, we can enhance our SQL capabilities 
so that the Flat Parquet reader can be invoked more frequently

Predicate Pushdown
- The reason I invoked Predicate Pushdown within the document is to help the 
analysis:
   o Notice how Record Batch materialization could involve many more pages
   o A solution that relies mainly on the current set of pages (one per column) 
might pay a heavy IO price without much to show for
  + By waiting for all columns to have at least one page loaded so that 
upfront stats are gathered 
  + Batch memory is then divided optimally across columns and the current 
batch size is computed
  + Unfortunately, such logic will fail if more pages are involved than the 
ones taken in consideration
   o Example -
  + Two variable length columns c1 and c2
  + Reader waits for two pages P1-1 and P2-1 so that we a) allocate memory 
optimally across c1 and c2 and b) compute a batch size that will minimize 
overflow logic
  + Assume, because of data length skew or predicate pushdown, that more 
pages are involved in loading the batch
  + for c1: {P1-1, P1-2, P1-3, P1-4}, c2: {P2-1, P2-2} 
  + It is now highly possible that overflow logic might not be optimal 
since only  two pages statistics were considered instead of six

 - I have added new logic to the ScanBatch so to log (on-demand) extra batch 
statistics which will help us assess the efficiency of the batch sizing 
strategy; will add this information to the document when this sub-task is done


Implementation Strategy
- DRILL-6147 mission is to implement batch sizing for Flat Parquet with minimal 
overhead
- This will also help us test this functionality for end-to-end cases (whole 
query)
- My next task (after DRILL-6147) is to incorporate your framework with Parquet 
- I’ll will a) enhance the framework to support columnar processing and b) 
refactor the Parquet code to user the framework
- I agree there might be some duplicate effort but I really believe this will 
be minimal
- DRILL-6147 is not more than one week of research & analysis and one week of 
implementation

Regards,
Salim



> On Feb 11, 2018, at 1:35 PM, Paul Rogers  wrote:
> 
> Hi All,
> Perhaps this topic needs just a bit more thought and discussion to avoid 
> working at cross purposes. I've outlined the issues, and a possible path 
> forward, in a comment to DRILL-6147.
> Quick summary: creating a second batch size implementation just for Parquet 
> will be very difficult once we handle all the required use cases as spelled 
> out in the comment. We'd want to be very sure that we do, indeed, want to 
> duplicate this effort before we head down that route. Duplicating the effort 
> means repeating all the work done over the last six months to make the 
> original result set loader work, and the future work needed to maintain two 
> parallel systems. This is not a decision to make by default.
> Thanks,
> - Paul
> 
>On Sunday, February 11, 2018, 12:10:58 AM PST, Parth Chandra 
>  wrote:  
> 
> Thanks Salim.
> Can you add this to the JIRA/design doc. Also, I would venture to suggest
> that the section on predicate pushdown can be made clearer.
> Also, Since you're proposing the average batch size approach with overflow
> handling, some detail on the proposed changes to the framework would be
> useful in the design doc. (Perhaps pseudo code and affected classes.)
> Essentially some guarantees provided by the framework will change and this
> may affect (or not) the existing usage. These should be enumerated in the
> design doc.
> 
> 



Re: Batch Sizing for Parquet Flat Reader

2018-02-11 Thread Paul Rogers
Hi All,
Perhaps this topic needs just a bit more thought and discussion to avoid 
working at cross purposes. I've outlined the issues, and a possible path 
forward, in a comment to DRILL-6147.
Quick summary: creating a second batch size implementation just for Parquet 
will be very difficult once we handle all the required use cases as spelled out 
in the comment. We'd want to be very sure that we do, indeed, want to duplicate 
this effort before we head down that route. Duplicating the effort means 
repeating all the work done over the last six months to make the original 
result set loader work, and the future work needed to maintain two parallel 
systems. This is not a decision to make by default.
Thanks,
- Paul

On Sunday, February 11, 2018, 12:10:58 AM PST, Parth Chandra 
 wrote:  
 
 Thanks Salim.
Can you add this to the JIRA/design doc. Also, I would venture to suggest
that the section on predicate pushdown can be made clearer.
Also, Since you're proposing the average batch size approach with overflow
handling, some detail on the proposed changes to the framework would be
useful in the design doc. (Perhaps pseudo code and affected classes.)
 Essentially some guarantees provided by the framework will change and this
may affect (or not) the existing usage. These should be enumerated in the
design doc.


  

[GitHub] drill issue #1110: DRILL-6115: SingleMergeExchange is not scaling up when ma...

2018-02-11 Thread amansinha100
Github user amansinha100 commented on the issue:

https://github.com/apache/drill/pull/1110
  
@HanumathRao I have a few comments in the JIRA for the overall design; we 
can discuss. 


---


[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

2018-02-11 Thread amansinha100
Github user amansinha100 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167447863
  
--- Diff: 
exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOrderedMuxExchange.java
 ---
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl;
+
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClientFixture;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestOrderedMuxExchange extends PlanTestBase {
+
+  private final static String ORDERED_MUX_EXCHANGE = "OrderedMuxExchange";
+
+
+  private void validateResults(BufferAllocator allocator, 
List results) throws SchemaChangeException {
+long previousBigInt = Long.MIN_VALUE;
+
+for (QueryDataBatch b : results) {
+  RecordBatchLoader loader = new RecordBatchLoader(allocator);
+  if (b.getHeader().getRowCount() > 0) {
+loader.load(b.getHeader().getDef(),b.getData());
+@SuppressWarnings({ "deprecation", "resource" })
+IntVector c1 = (IntVector) 
loader.getValueAccessorById(IntVector.class,
+   loader.getValueVectorId(new SchemaPath("id_i", 
ExpressionPosition.UNKNOWN)).getFieldIds()).getValueVector();
+IntVector.Accessor a1 = c1.getAccessor();
+
+for (int i = 0; i < c1.getAccessor().getValueCount(); i++) {
+  assertTrue(String.format("%d > %d", previousBigInt, a1.get(i)), 
previousBigInt <= a1.get(i));
+  previousBigInt = a1.get(i);
+}
+  }
+  loader.clear();
+  b.release();
+}
+  }
+
+  /**
+   * Test case to verify the OrderedMuxExchange created for order by 
clause.
+   * It checks by forcing the plan to create OrderedMuxExchange and also 
verifies the
+   * output column is ordered.
+   *
+   * @throws Exception if anything goes wrong
+   */
+
+  @Test
+  public void testOrderedMuxForOrderBy() throws Exception {
+ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+.maxParallelization(1)
+
.configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, true)
+;
+
+try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+  client.alterSession(ExecConstants.SLICE_TARGET, 10);
+  String sql = "SELECT id_i, name_s10 FROM `mock`.`employees_10K` 
ORDER BY id_i limit 10";
--- End diff --

I am not sure how the table is organized..does it have already ordered id_i 
column ? if so, we should use a different column. 


---


[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

2018-02-11 Thread amansinha100
Github user amansinha100 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167447232
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java
 ---
@@ -20,133 +20,34 @@
 import com.google.common.collect.Lists;
 
 import org.apache.drill.exec.planner.physical.ExchangePrel;
-import org.apache.drill.exec.planner.physical.HashPrelUtil;
-import 
org.apache.drill.exec.planner.physical.HashPrelUtil.HashExpressionCreatorHelper;
-import org.apache.drill.exec.planner.physical.HashToRandomExchangePrel;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.physical.Prel;
-import org.apache.drill.exec.planner.physical.ProjectPrel;
-import 
org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
-import org.apache.drill.exec.planner.physical.UnorderedDeMuxExchangePrel;
-import org.apache.drill.exec.planner.physical.UnorderedMuxExchangePrel;
-import org.apache.drill.exec.planner.sql.DrillSqlOperator;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexUtil;
-
-import java.math.BigDecimal;
-import java.util.Collections;
 import java.util.List;
 
 public class InsertLocalExchangeVisitor extends BasePrelVisitor {
-  private final boolean isMuxEnabled;
-  private final boolean isDeMuxEnabled;
-
-
-  public static class RexNodeBasedHashExpressionCreatorHelper implements 
HashExpressionCreatorHelper {
-private final RexBuilder rexBuilder;
-
-public RexNodeBasedHashExpressionCreatorHelper(RexBuilder rexBuilder) {
-  this.rexBuilder = rexBuilder;
-}
-
-@Override
-public RexNode createCall(String funcName, List inputFields) {
-  final DrillSqlOperator op =
-  new DrillSqlOperator(funcName, inputFields.size(), true, false);
-  return rexBuilder.makeCall(op, inputFields);
-}
-  }
+  private final OptionManager options;
 
   public static Prel insertLocalExchanges(Prel prel, OptionManager 
options) {
 boolean isMuxEnabled = 
options.getOption(PlannerSettings.MUX_EXCHANGE.getOptionName()).bool_val;
 boolean isDeMuxEnabled = 
options.getOption(PlannerSettings.DEMUX_EXCHANGE.getOptionName()).bool_val;
 
 if (isMuxEnabled || isDeMuxEnabled) {
-  return prel.accept(new InsertLocalExchangeVisitor(isMuxEnabled, 
isDeMuxEnabled), null);
+  return prel.accept(new InsertLocalExchangeVisitor(options), null);
--- End diff --

Since the local variables isMuxEnabled/disabled are not being used anymore, 
you can remove them on lines 33, 34. 


---


[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

2018-02-11 Thread amansinha100
Github user amansinha100 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167448218
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedMuxExchange.java
 ---
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import java.util.List;
+
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.common.logical.data.Order.Ordering;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * OrderedMuxExchange is a version of MuxExchange where the incoming 
batches are sorted
+ * merge operation is performed to produced a sorted stream as output.
+ */
+@JsonTypeName("ordered-mux-exchange")
+public class OrderedMuxExchange extends AbstractMuxExchange {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(OrderedMuxExchange.class);
+
+  private final List orderings;
+
+  public OrderedMuxExchange(@JsonProperty("child") PhysicalOperator child, 
@JsonProperty("orderings")List orderings) {
+super(child);
+this.orderings = orderings;
+  }
+
+  @Override
+  public Receiver getReceiver(int minorFragmentId) {
+createSenderReceiverMapping();
+
+List senders = 
receiverToSenderMapping.get(minorFragmentId);
+if (senders == null || senders.size() <= 0) {
+  throw new IllegalStateException(String.format("Failed to find 
senders for receiver [%d]", minorFragmentId));
+}
+
+if (logger.isDebugEnabled()) {
+  logger.debug(String.format("Minor fragment %d, receives data from 
following senders %s", minorFragmentId, senders));
+}
+
+return new MergingReceiverPOP(senderMajorFragmentId, senders, 
orderings, false);
--- End diff --

The HashToMergeExchange creates a MergingReciver with spooling TRUE, 
whereas the SingleMergeExchange creates one with spooling FALSE.  Although we 
don't test the spooling, I feel the new OrderedMuxExchange should probably have 
the same spooling setting as the HashToMergeExchange since both do the merge on 
local drill bits vs the foreman. 


---


[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

2018-02-11 Thread amansinha100
Github user amansinha100 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167448543
  
--- Diff: 
exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOrderedMuxExchange.java
 ---
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl;
+
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClientFixture;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestOrderedMuxExchange extends PlanTestBase {
+
+  private final static String ORDERED_MUX_EXCHANGE = "OrderedMuxExchange";
+
+
+  private void validateResults(BufferAllocator allocator, 
List results) throws SchemaChangeException {
+long previousBigInt = Long.MIN_VALUE;
+
+for (QueryDataBatch b : results) {
+  RecordBatchLoader loader = new RecordBatchLoader(allocator);
+  if (b.getHeader().getRowCount() > 0) {
+loader.load(b.getHeader().getDef(),b.getData());
+@SuppressWarnings({ "deprecation", "resource" })
+IntVector c1 = (IntVector) 
loader.getValueAccessorById(IntVector.class,
+   loader.getValueVectorId(new SchemaPath("id_i", 
ExpressionPosition.UNKNOWN)).getFieldIds()).getValueVector();
+IntVector.Accessor a1 = c1.getAccessor();
+
+for (int i = 0; i < c1.getAccessor().getValueCount(); i++) {
+  assertTrue(String.format("%d > %d", previousBigInt, a1.get(i)), 
previousBigInt <= a1.get(i));
+  previousBigInt = a1.get(i);
+}
+  }
+  loader.clear();
+  b.release();
+}
+  }
+
+  /**
+   * Test case to verify the OrderedMuxExchange created for order by 
clause.
+   * It checks by forcing the plan to create OrderedMuxExchange and also 
verifies the
+   * output column is ordered.
+   *
+   * @throws Exception if anything goes wrong
+   */
+
+  @Test
+  public void testOrderedMuxForOrderBy() throws Exception {
+ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+.maxParallelization(1)
+
.configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, true)
+;
+
+try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+  client.alterSession(ExecConstants.SLICE_TARGET, 10);
+  String sql = "SELECT id_i, name_s10 FROM `mock`.`employees_10K` 
ORDER BY id_i limit 10";
+
+  String explainText = client.queryBuilder().sql(sql).explainText();
+  assertTrue(explainText.contains(ORDERED_MUX_EXCHANGE));
+  validateResults(client.allocator(), 
client.queryBuilder().sql(sql).results());
+}
+  }
+
+  /**
+   * Test case to verify the OrderedMuxExchange created for window 
functions.
+   * It checks by forcing the plan to create OrderedMuxExchange and also 
verifies the
+   * output column is ordered.
+   *
+   * @throws Exception if anything goes wrong
+   */
+
+  @Test
+  public void testOrderedMuxForWindowAgg() throws Exception {
+ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+.maxParallelization(1)
+
.configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, 

[GitHub] drill issue #1110: DRILL-6115: SingleMergeExchange is not scaling up when ma...

2018-02-11 Thread HanumathRao
Github user HanumathRao commented on the issue:

https://github.com/apache/drill/pull/1110
  
@vrozov  Thank you for reviewing the code. I have incorporated all the 
review comments. Please let me know if anything needs to be changed.


---


[GitHub] drill pull request #1117: DRILL-6089 Removed ordering trait from HashJoin in...

2018-02-11 Thread amansinha100
Github user amansinha100 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1117#discussion_r167441911
  
--- Diff: 
contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHashJoinOrdering.java
 ---
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec;
+
+import org.apache.drill.categories.HiveStorageTest;
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.exec.hive.HiveTestBase;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.test.QueryTestUtil;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({SlowTest.class, HiveStorageTest.class})
+public class TestHashJoinOrdering extends HiveTestBase {
--- End diff --

I am wondering why a separate HashJoinOrdering test is needed for Hive. The 
Drill join planner is not doing something specific for Hive tables.  If we have 
a Hive test, why not for other storage plugins such as HBase etc but since the 
proposed fix removes Collation property for any input stream of a HashJoin, it 
should not matter where that input originated from. 


---


[GitHub] drill pull request #1117: DRILL-6089 Removed ordering trait from HashJoin in...

2018-02-11 Thread amansinha100
Github user amansinha100 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1117#discussion_r167442088
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
 ---
@@ -246,4 +246,17 @@ public RelNode convertChild(final DrillJoinRel join,  
final RelNode rel) throws
 
   }
 
+  // DRILL-6089 make sure no collations are added to HashJoin
+  public static RelTraitSet removeCollation(RelTraitSet traitSet, 
RelOptRuleCall call)
+  {
+RelTraitSet newTraitSet = call.getPlanner().emptyTraitSet();
+
+for (RelTrait trait: traitSet) {
+  if (!trait.getTraitDef().getTraitClass().equals(RelCollation.class)) 
{
--- End diff --

I suppose you already considered Calcite's RelTraitSet.replace() method and 
found this to be simpler ?


---


[GitHub] drill pull request #1117: DRILL-6089 Removed ordering trait from HashJoin in...

2018-02-11 Thread amansinha100
Github user amansinha100 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1117#discussion_r167442059
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
 ---
@@ -246,4 +246,17 @@ public RelNode convertChild(final DrillJoinRel join,  
final RelNode rel) throws
 
   }
 
+  // DRILL-6089 make sure no collations are added to HashJoin
+  public static RelTraitSet removeCollation(RelTraitSet traitSet, 
RelOptRuleCall call)
--- End diff --

Since this is static method, better to add it to PrelUtil class for general 
purpose use. 


---


[GitHub] drill pull request #1117: DRILL-6089 Removed ordering trait from HashJoin in...

2018-02-11 Thread amansinha100
Github user amansinha100 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1117#discussion_r167441485
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java
 ---
@@ -61,7 +61,7 @@
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(FragmentsRunner.class);
   private static final ControlsInjector injector = 
ControlsInjectorFactory.getInjector(FragmentsRunner.class);
 
-  private static final long RPC_WAIT_IN_MSECS_PER_FRAGMENT = 5000;
+  private static final long RPC_WAIT_IN_MSECS_PER_FRAGMENT = 3;
--- End diff --

Not sure why this and the memory settings changes are in this PR..they are 
unrelated to DRILL-6089.  


---


[GitHub] drill pull request #1117: DRILL-6089 Removed ordering trait from HashJoin in...

2018-02-11 Thread amansinha100
Github user amansinha100 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1117#discussion_r167441417
  
--- Diff: 
exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinAdvanced.java
 ---
@@ -197,4 +198,24 @@ public void emptyPartTest() throws Exception {
   BaseTestQuery.resetSessionOption(ExecConstants.SLICE_TARGET);
 }
   }
+
+  @Test // DRILL-6089
+  public void testJoinOrdering() throws Exception {
+final String query = "select * from dfs.`sample-data/nation.parquet` 
nation left outer join " +
+  "(select * from dfs.`sample-data/region.parquet`) " +
+  "as region on region.r_regionkey = nation.n_nationkey order by 
region.r_name desc";
+final String plan = getPlanInString("EXPLAIN PLAN for " + 
QueryTestUtil.normalizeQuery(query), OPTIQ_FORMAT);
+lastSortAfterJoin(plan);
--- End diff --

Most plan tests that we have use one of the utility methods in PlanTestBase 
(from which JoinTestBase is derived) which uses Java's regex Pattern and 
Matcher classes.  In your query, is it necessary to check the index of the Sort 
vs the HashJoin ?  Since there is expected to be only 1 Sort (corresponding to 
the final ORDER BY), as long as there is a regex pattern that matches  
`'*Sort*HashJoin',` I think that would be sufficient.  You might want to see 
the callers of [1] if it satisfies your requirement. 

[1] 
https://github.com/apache/drill/blob/master/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java#L82


---


Re: Batch Sizing for Parquet Flat Reader

2018-02-11 Thread Parth Chandra
Thanks Salim.
Can you add this to the JIRA/design doc. Also, I would venture to suggest
that the section on predicate pushdown can be made clearer.
Also, Since you're proposing the average batch size approach with overflow
handling, some detail on the proposed changes to the framework would be
useful in the design doc. (Perhaps pseudo code and affected classes.)
 Essentially some guarantees provided by the framework will change and this
may affect (or not) the existing usage. These should be enumerated in the
design doc.





On Fri, Feb 9, 2018 at 11:52 PM, salim achouche 
wrote:

> Thank you Parth for providing feedback; please find my answers below:
>
> I have created Apache JIRA DRILL-6147
>  for tracking
> this improvement.
>
> >  2) Not sure where you were going with the predicate pushdown section and
> how it pertains to your proposed batch sizing.
>
> Predicate push down was part of the Design Considerations section; the
> intent is that the design should be able to handle future use-cases such as
> push down. Notice how the Page based Statistical Approach will not work
> well with predicate push down as one single batch can span many pages per
> column.
>
> > 3) Assuming that you go with the average batch size calculation approach,
> are you proposing to have a Parquet scan specific overflow implementation?
> Or are you planning to leverage the ResultSet loader mechanism? If you plan
> to use the latter, it will need to be enhanced to handle a bulk chunk as
> opposed to a single value at a time. If not using the ResultSet loader
> mechanism, why not (you would be reinventing the wheel) ?
>
> Padma Penumarthy and I are currently working on the batch sizing
> functionality and selected few TPCH queries to show case end-to-end use
> cases. Immediately after this task, I'll be working on enhancing the new
> framework to support columnar processing and as such retrofit DRILL-6147
> implementation as part of the new framework. So essentially we want to make
> progress in both fronts so that a) OOM conditions are minimized as soon as
> possible and b) the new Reader framework is applied to all readers and
> operators is rolled out in the next few releases.
>
> > Also note that memory allocations by Netty greater than the 16MB chunk
> size
> are returned to the OS when the memory is free'd. Both this document and
> the original document on memory fragmentation state incorrectly that such
> memory is not released back to the OS. A quick thought experiment - where
> does this memory go if it is not released back to the OS?
>
> I have the same understanding as you:
> - I think Paul meant that 16 MB blocks are not released to the OS (cached
> within Netty)
> - Many memory allocators exhibit the same behavior as the release mechanism
> is slow (heuristics used to decide when to release so to balance between
> performance and resource usage)
> - Basically, if Drill holds a large count of 16 MB blocks, than a 32 MB, 64
> MB , etc memory allocation might fail since
>   *  none of the Netty allocated blocks can satisfy the new request
>   *  a new OS allocation will take Drill beyond the maximum direct memory
>
>
> On Fri, Feb 9, 2018 at 4:08 AM, Parth Chandra  wrote:
>
> > Is there a JIRA for this? Would be useful to capture the comments in the
> > JIRA. Note that the document itself is not comment-able as it is shared
> > with view-only permissions.
> >
> > Some thoughts in no particular order-
> > 1) The Page based statistical approach is likely to run into trouble with
> > the encoding used for Parquet fields especially RLE which drastically
> > changes the size of the field. So pageSize/numValues is going to be
> wildly
> > inaccurate with RLE.
> > 2) Not sure where you were going with the predicate pushdown section and
> > how it pertains to your proposed batch sizing.
> > 3) Assuming that you go with the average batch size calculation approach,
> > are you proposing to have a Parquet scan specific overflow
> implementation?
> > Or are you planning to leverage the ResultSet loader mechanism? If you
> plan
> > to use the latter, it will need to be enhanced to handle a bulk chunk as
> > opposed to a single value at a time. If not using the ResultSet loader
> > mechanism, why not (you would be reinventing the wheel) ?
> > 4) Parquet page level stats are probably not reliable. You can assume
> page
> > size (compressed/uncompressed) and value count are accurate, but nothing
> > else.
> >
> > Also note that memory allocations by Netty greater than the 16MB chunk
> size
> > are returned to the OS when the memory is free'd. Both this document and
> > the original document on memory fragmentation state incorrectly that such
> > memory is not released back to the OS. A quick thought experiment - where
> > does this memory go if it is not released back to the OS?
> >
> >
> >
> > On Fri, Feb 9, 2018 at 7:12 AM, salim