Alex Behm has posted comments on this change. Change subject: IMPALA-5036: Parquet count star optimization ......................................................................
Patch Set 1: (36 comments) http://gerrit.cloudera.org:8080/#/c/6812/1//COMMIT_MSG Commit Message: PS1, Line 10: statistic > I would prefer to use a different word than "statistic" in this context. We Suggestions? How important do you think it is to distinguish the Parquet num_rows from parquet::Statistics? In the not-too-distant future we will use parquet::Statistics for optimizing min/max aggregates also. Seems fair enough to call all these "Parquet stats". Line 13: Mention the new rewrite rule http://gerrit.cloudera.org:8080/#/c/6812/1/be/src/exec/hdfs-parquet-scanner.cc File be/src/exec/hdfs-parquet-scanner.cc: Line 426: // Populate the slot with Parquet num rows statistic. Populate the single slot with the Parquet num rows statistic. Line 431: memset(tuple_buf, 0, tuple_buf_size); Don't think we usually do this, and I don't think it's necessary. Line 432: while (!row_batch->AtCapacity()) { Do we have a suitable file with many row groups for testing this logic? We can decrease the batch_size in the test. Line 488: DCHECK(row_group_idx_ <= file_metadata_.row_groups.size()); Why this change? Line 1455: // Column readers are not needed because we are not reading from any columns if this DCHECK that there is exactly one materialized slot http://gerrit.cloudera.org:8080/#/c/6812/1/be/src/exec/hdfs-scan-node-base.h File be/src/exec/hdfs-scan-node-base.h: Line 158: const bool optimize_parquet_count_star() { return optimize_parquet_count_star_; } const function Line 373: bool optimize_parquet_count_star_; const http://gerrit.cloudera.org:8080/#/c/6812/1/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java File fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java: Line 585: if (copiedInputExprs.size() != inputExprs.size()) return; Have you tried also substituting the FunctionCallExpr.mergeAggInputFn_ in AggregateInfo.substitute() so the Update() and Merge() are consistent again? I understand what the issue is, but this does not seem quite right. http://gerrit.cloudera.org:8080/#/c/6812/1/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java File fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java: Line 331: * Return true if the slots being materialized are all partition columns. The new behavior is tricky to reason about, can we simplify it? Line 335: if (materializedSlots.isEmpty()) return true; I think this might break the behavior of OPTIMIZE_PARTITION_KEY_SCANS=true for unpartitioned tables. What happens when you SET OPTIMIZE_PARTITION_KEY_SCANS=true and then run "select count(distinct 1) from unpartitioned_table"? http://gerrit.cloudera.org:8080/#/c/6812/1/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java File fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java: Line 131: // Set to true when this scan node can optimize a count(*) query by populating the Sentence is a little misleading because this flag is initialized to true if the query block has an aggregation that is amenable to being optimized with Parquet stats. But then the flag is changed to have a different meaning, so it's a little difficult to understand what it does. How about something along the lines of this: 1. Pass in a 'private final' flag that indicates whether the aggregation is amenable to optimization. This flag never changes. 2. If the optimization can be performed, set the smap. Use the smap nullness to determine whether the optimization was applied. Line 136: protected ExprSubstitutionMap aggSmap_; // Should be applied to the AggregateInfo from the same query block. Expand the comment to explain why we cannot use PlanNode.outputSmap_ for this purpose. Suggest: optimizedAggSmap_ Line 243: if (optimizeParquetCountStar_ && fileFormats.size() == 1 && conjuncts_.isEmpty() && Create a helper function for this optimization, and describe in its function comment what this optimization does and how it works. In particular, that we are adding a new scan slot. Line 245: // Create two functions that we will put into an smap. We want to replace the Preconditions.checkState(desc_.getPath().destTable() == null); Preconditions.checkState(collectionConjuncts_.isEmpty()); The first condition asserts that we are not scanning a nested collection. Line 246: // count function with a sum function. count() and sum() mention that it is a special sum() function http://gerrit.cloudera.org:8080/#/c/6812/1/fe/src/main/java/org/apache/impala/planner/PlanNode.java File fe/src/main/java/org/apache/impala/planner/PlanNode.java: Line 213: * @param limit can just remove this http://gerrit.cloudera.org:8080/#/c/6812/1/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java File fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java: Line 559: private boolean checkOptimizeCountStar(SelectStmt selectStmt) { Let's move this logic into AggregateInfo. Rename the function and rephrase the comment to describe what the function does as opposed to what the caller intends to do with it, e.g.,: hasCountStarOnly() (similar to AggregateInfo.hasAllDistinctAgg()) Thinking about this a little more, I think we may eventually end up passing AggregateInfo directly into the HdfsScanNode (for min/max optimizations), but we can tackle that later. Line 631: checkOptimizeCountStar(selectStmt), analyzer); I think we are missing a condition. We can only have a single table ref. Also needs a test. Line 634: if (root instanceof HdfsScanNode) { brief comment (we may be optimizing the aggregation based on statistics from the scan) http://gerrit.cloudera.org:8080/#/c/6812/1/fe/src/main/java/org/apache/impala/rewrite/CountConstantRule.java File fe/src/main/java/org/apache/impala/rewrite/CountConstantRule.java: Line 28: * Replaces count(CONSTANT) with an aquivalent count{*}. Replaces count(<literal>) with an equivalent count(*). Line 32: * count(2017) --> count(*) add count(null) -> count(null) Line 34: public class CountConstantRule implements ExprRewriteRule { How about NormalizeCountStarRule Line 45: Expr e = origExpr.getChild(0); e -> child http://gerrit.cloudera.org:8080/#/c/6812/1/testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test File testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test: Line 1: # IMPALA-5036 The JIRA is not very useful. Instead, describe what this test is covering. Add a test in ExprRewriteRulesTest for the new rewrite rule. I think we can omit the JIRA-5036 everywhere (it applies to everything in this file, so not super useful) Line 3: UNION ALL I prefer consistent lower case or upper case, e.g., union all, or make the other SQL keywords upper case Line 22: | | output: sum_zero_if_empty(functional_parquet.alltypes.parquet-stats: num_rows) agg expr seems long, I'm thinking we can omit the "functional_parquet.alltypes" part because that should always be clear (optimization does not work if you have joins or more complicated plans) Line 28: | | output: sum_zero_if_empty(functional_parquet.alltypes.parquet-stats: num_rows) What do you think of "sum_init_zero()"? It's a little shorter and seems a little cleaner semantically because the "if_empty" part is a statement about the input of the aggregation node rather than the semantics of the function. Line 112: # IMPALA-5036: group by partitioned columns. partition columns Line 141: # IMPALA-5036: The optimization is disabled because tinyint_col is not a partitioned col. partition col Line 143: ---- PLAN add a negative case with a different agg function, something like: select count(*), max(year) from functional_parquet.alltypes Line 377: # IMPALA-5036 Also add a test like this: select count(*) from (select int_col from functional_parquet.alltypes) v; http://gerrit.cloudera.org:8080/#/c/6812/1/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test File testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test: Line 314: WARNING: The following tables are missing relevant table and/or column statistics. These seemed to have stats before? Something weird in your setup? http://gerrit.cloudera.org:8080/#/c/6812/1/testdata/workloads/functional-query/queries/QueryTest/parquet-stats.test File testdata/workloads/functional-query/queries/QueryTest/parquet-stats.test: Line 285: from functional_parquet.alltypes I think we need at least one focused test on a file with many row groups and a smaller batch size to exercise the GetNextInternal() code paths, i.e., when we need several calls to GetNextInternal() to finish the scan range. Line 292: # IMPALA-5036 Same here, JIRA number not very useful, prefer textual description of test -- To view, visit http://gerrit.cloudera.org:8080/6812 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I536b85c014821296aed68a0c68faadae96005e62 Gerrit-PatchSet: 1 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Taras Bobrovytsky <[email protected]> Gerrit-Reviewer: Alex Behm <[email protected]> Gerrit-Reviewer: Lars Volker <[email protected]> Gerrit-HasComments: Yes
