Alex Behm has posted comments on this change.

Change subject: IMPALA-5036: Parquet count star optimization
......................................................................


Patch Set 1:

(36 comments)

http://gerrit.cloudera.org:8080/#/c/6812/1//COMMIT_MSG
Commit Message:

PS1, Line 10: statistic
> I would prefer to use a different word than "statistic" in this context. We
Suggestions? How important do you think it is to distinguish the Parquet 
num_rows from parquet::Statistics? In the not-too-distant future we will use 
parquet::Statistics for optimizing min/max aggregates also. Seems fair enough 
to call all these "Parquet stats".


Line 13: 
Mention the new rewrite rule


http://gerrit.cloudera.org:8080/#/c/6812/1/be/src/exec/hdfs-parquet-scanner.cc
File be/src/exec/hdfs-parquet-scanner.cc:

Line 426:     // Populate the slot with Parquet num rows statistic.
Populate the single slot with the Parquet num rows statistic.


Line 431:     memset(tuple_buf, 0, tuple_buf_size);
Don't think we usually do this, and I don't think it's necessary.


Line 432:     while (!row_batch->AtCapacity()) {
Do we have a suitable file with many row groups for testing this logic? We can 
decrease the batch_size in the test.


Line 488:     DCHECK(row_group_idx_ <= file_metadata_.row_groups.size());
Why this change?


Line 1455:     // Column readers are not needed because we are not reading from 
any columns if this
DCHECK that there is exactly one materialized slot


http://gerrit.cloudera.org:8080/#/c/6812/1/be/src/exec/hdfs-scan-node-base.h
File be/src/exec/hdfs-scan-node-base.h:

Line 158:   const bool optimize_parquet_count_star() { return 
optimize_parquet_count_star_; }
const function


Line 373:   bool optimize_parquet_count_star_;
const


http://gerrit.cloudera.org:8080/#/c/6812/1/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
File fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java:

Line 585:     if (copiedInputExprs.size() != inputExprs.size()) return;
Have you tried also substituting the FunctionCallExpr.mergeAggInputFn_ in 
AggregateInfo.substitute() so the Update() and Merge() are consistent again?

I understand what the issue is, but this does not seem quite right.


http://gerrit.cloudera.org:8080/#/c/6812/1/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
File fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java:

Line 331:    * Return true if the slots being materialized are all partition 
columns.
The new behavior is tricky to reason about, can we simplify it?


Line 335:     if (materializedSlots.isEmpty()) return true;
I think this might break the behavior of OPTIMIZE_PARTITION_KEY_SCANS=true for 
unpartitioned tables.

What happens when you SET OPTIMIZE_PARTITION_KEY_SCANS=true and then run 
"select count(distinct 1) from unpartitioned_table"?


http://gerrit.cloudera.org:8080/#/c/6812/1/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
File fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java:

Line 131:   // Set to true when this scan node can optimize a count(*) query by 
populating the
Sentence is a little misleading because this flag is initialized to true if the 
query block has an aggregation that is amenable to being optimized with Parquet 
stats. But then the flag is changed to have a different meaning, so it's a 
little difficult to understand what it does.

How about something along the lines of this:
1. Pass in a 'private final' flag that indicates whether the aggregation is 
amenable to optimization. This flag never changes.
2. If the optimization can be performed, set the smap. Use the smap nullness to 
determine whether the optimization was applied.


Line 136:   protected ExprSubstitutionMap aggSmap_;
// Should be applied to the AggregateInfo from the same query block.

Expand the comment to explain why we cannot use PlanNode.outputSmap_ for this 
purpose.

Suggest: optimizedAggSmap_


Line 243:       if (optimizeParquetCountStar_ && fileFormats.size() == 1 && 
conjuncts_.isEmpty() &&
Create a helper function for this optimization, and describe in its function 
comment what this optimization does and how it works. In particular, that we 
are adding a new scan slot.


Line 245:         // Create two functions that we will put into an smap. We 
want to replace the
Preconditions.checkState(desc_.getPath().destTable() == null);
Preconditions.checkState(collectionConjuncts_.isEmpty());

The first condition asserts that we are not scanning a nested collection.


Line 246:         // count function with a sum function.
count() and sum()

mention that it is a special sum() function


http://gerrit.cloudera.org:8080/#/c/6812/1/fe/src/main/java/org/apache/impala/planner/PlanNode.java
File fe/src/main/java/org/apache/impala/planner/PlanNode.java:

Line 213:    * @param limit
can just remove this


http://gerrit.cloudera.org:8080/#/c/6812/1/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
File fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java:

Line 559:   private boolean checkOptimizeCountStar(SelectStmt selectStmt) {
Let's move this logic into AggregateInfo.

Rename the function and rephrase the comment to describe what the function does 
as opposed to what the caller intends to do with it, e.g.,:

hasCountStarOnly()

(similar to AggregateInfo.hasAllDistinctAgg())


Thinking about this a little more, I think we may eventually end up passing 
AggregateInfo directly into the HdfsScanNode (for min/max optimizations), but 
we can tackle that later.


Line 631:         checkOptimizeCountStar(selectStmt), analyzer);
I think we are missing a condition. We can only have a single table ref. Also 
needs a test.


Line 634:       if (root instanceof HdfsScanNode) {
brief comment (we may be optimizing the aggregation based on statistics from 
the scan)


http://gerrit.cloudera.org:8080/#/c/6812/1/fe/src/main/java/org/apache/impala/rewrite/CountConstantRule.java
File fe/src/main/java/org/apache/impala/rewrite/CountConstantRule.java:

Line 28:  * Replaces count(CONSTANT) with an aquivalent count{*}.
Replaces count(<literal>) with an equivalent count(*).


Line 32:  * count(2017) --> count(*)
add count(null) -> count(null)


Line 34: public class CountConstantRule implements ExprRewriteRule {
How about NormalizeCountStarRule


Line 45:     Expr e = origExpr.getChild(0);
e -> child


http://gerrit.cloudera.org:8080/#/c/6812/1/testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test
File 
testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test:

Line 1: # IMPALA-5036
The JIRA is not very useful. Instead, describe what this test is covering.

Add a test in ExprRewriteRulesTest for the new rewrite rule.

I think we can omit the JIRA-5036 everywhere (it applies to everything in this 
file, so not super useful)


Line 3: UNION ALL
I prefer consistent lower case or upper case, e.g., union all, or make the 
other SQL keywords upper case


Line 22: |  |  output: 
sum_zero_if_empty(functional_parquet.alltypes.parquet-stats: num_rows)
agg expr seems long, I'm thinking we can omit the "functional_parquet.alltypes" 
part because that should always be clear (optimization does not work if you 
have joins or more complicated plans)


Line 28: |  |  output: 
sum_zero_if_empty(functional_parquet.alltypes.parquet-stats: num_rows)
What do you think of "sum_init_zero()"? It's a little shorter and seems a 
little cleaner semantically because the "if_empty" part is a statement about 
the input of the aggregation node rather than the semantics of the function.


Line 112: # IMPALA-5036: group by partitioned columns.
partition columns


Line 141: # IMPALA-5036: The optimization is disabled because tinyint_col is 
not a partitioned col.
partition col


Line 143: ---- PLAN
add a negative case with a different agg function, something like:

select count(*), max(year) from functional_parquet.alltypes


Line 377: # IMPALA-5036
Also add a test like this:
select count(*) from (select int_col from functional_parquet.alltypes) v;


http://gerrit.cloudera.org:8080/#/c/6812/1/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
File 
testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test:

Line 314: WARNING: The following tables are missing relevant table and/or 
column statistics.
These seemed to have stats before? Something weird in your setup?


http://gerrit.cloudera.org:8080/#/c/6812/1/testdata/workloads/functional-query/queries/QueryTest/parquet-stats.test
File testdata/workloads/functional-query/queries/QueryTest/parquet-stats.test:

Line 285: from functional_parquet.alltypes
I think we need at least one focused test on a file with many row groups and a 
smaller batch size to exercise the GetNextInternal() code paths, i.e., when we 
need several calls to GetNextInternal() to finish the scan range.


Line 292: # IMPALA-5036
Same here, JIRA number not very useful, prefer textual description of test


-- 
To view, visit http://gerrit.cloudera.org:8080/6812
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I536b85c014821296aed68a0c68faadae96005e62
Gerrit-PatchSet: 1
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Taras Bobrovytsky <[email protected]>
Gerrit-Reviewer: Alex Behm <[email protected]>
Gerrit-Reviewer: Lars Volker <[email protected]>
Gerrit-HasComments: Yes

Reply via email to