Hello Aman Sinha, Zoltan Borok-Nagy, Peter Rozsa, Joe McDonnell, Csaba
Ringhofer, Noemi Pap-Takacs, Michael Smith, Impala Public Jenkins,
I'd like you to reexamine a change. Please visit
http://gerrit.cloudera.org:8080/24311
to look at the new patch set (#11).
Change subject: IMPALA-14999: Calcite planner: support Iceberg tables (part 2)
......................................................................
IMPALA-14999: Calcite planner: support Iceberg tables (part 2)
Note to reviewers: If this commit is too big, I can break this into smaller
chunks
based on the description in this commit. The only issue with doing it this
way is that my testing was with existing Iceberg tests while using the Calcite
planner. It will be awkward to isolate the tests just for the purpose of
getting small parts of the Calcite functionality to work.
Second note to reviewers: As mentioned in the previous paragraph, the testing
was done with the Calcite planner. While a couple of tests were added to
calcite.test to ensure that some Iceberg tests are working, there is no way
to tell the difference between tests that work on Calcite versus ones that
fail on Calcite and fallback to the original planner. This is still in the
works and will be there within the coming months.
The above two paragraphs are just for the review process and will be removed
once this has been reviewed.
The main changes enabling Iceberg tables to be processed by the Calcite planner
can be found in:
- CalciteDb: The table fetched from metadata now allows FeIcebergTable objects.
This will instantiate a CalciteIcebergTable object. The CalciteIcebergTable
object isn't explicitly needed for supporting Iceberg tables, but it does
contain methods needed for count star optimization which will be explained
below.
- ImpalaHdfsScanRel: When an Iceberg table is found, it needs to instantiate
the physical nodes via the IcebergScanPlanner.
All the other changes in this commit are meant to deal with either specific
issues within Calcite that needed to change to handle edge cases or performance
optimizations for Iceberg. These changes include:
The biggest change in terms of code complexity is to handle the count star
optimization for Iceberg. Most, but not all of the handling happens during
the logical node optimization rule, IcebergCountStarOptimizationRule, which
is called right before the logical node to physical node conversion. The
rule is broken down into v1 (no delete files) and v2 tables. This rule can
only be applied when the count star is applied for the whole table, i.e.
no groups or filters.
- v1 count optimization converts the Agg <- TableScan into a Values rel node
if there is only a count star in the query for the whole table, since the
count can be calculated at compilation time. If there are other aggregates
in the query, it also uses the constant retrieved, but it cannot remove
the aggregate calculation. In this case, it places a LogicalProject on top
of the Aggregate with the constant value and removes the count star
calculation from the aggregate.
- v2 count optimization precalculates the count star for rows that don't
have an associated delete file. It has to calculate the rows associated with
the delete files at runtime. These counts are added in a Project on top of
the aggregate. This is similar to the logic that is in SelectStmt at the
time of this commit. One part missing within the rule is the call to
tableRef.setOptimizeCountStarForIcebergV2(). The tableRef does not exist
until the physical node creation, so this set can only be done within
ImpalaHdfsScanRel (which calls CalciteIcebergTable).
Some additional code for count star optimization exists in ImpalaHdfsScanRel.
If there is a filter or group by on partitioned columns, we cannot
precalculate the counts, but we can pushdown predicates and examine file
metadata instead of the rows. Some logic in ImpalaHdfsScanRel and
CalciteIcebergTable has been created/refactored to handle this situation.
One note: IMPALA-14995 has been filed because the original planner does
not optimize on a count(*) on a partitioned group by column, but it does
work for the Calcite planner.
This covers the work needed for the count star optimization. There were
other fixes needed in the Calcite infrastructure to support the existing
code in IcebergScanPlanner. These include:
- The isPartitionKeyScan() method in IcebergScanPlanner, similar to count
star optimization, checks to see if all distinct columns used on the table are
partitioned columns. The IcebergScanPlanner looks at MultipleAggInfo for
this information, but this aggInfo doesn't exist for the Calcite planner.
The information is passed down through ParentPlanRelContext from the Aggregate
RelNode (this code already existed), and the check is handled within the
newly added ScanNodeHelper. This code has been added to other
*Node classes as well.
- Some code had to be refactored to enable runtime filters for Iceberg tables.
The Calcite planner registers equivalent columns in the valueTransferGraph in
the analyzer in the ImpalaJoinRel class. It needs the TableRef information from
the TableScans for this graph. The code before this commit retrieved this
information off of the ImpalaHdfsScanNode, but this cannot be done for Iceberg.
The new mechanism to get the TableRef information is to pass the information
up through the NodeWithExprs class, an existing way to handle all information
passed up from the lower nodes. All RelNode classes had to be changed to pass
the information up.
- Iceberg Predicate pushdown looks for explicit classes and patterns that
Calcite was not using. Specifically:
- NOT was being used in FunctionCallExpr. It is now converted to a
CompoundPredicate
- LIKE was also being used in FunctionCallExpr. It is now converted to a
LikePredicate
- This is more of a generic issue, but Calcite allows the "user" keyword
without
parens to be treated as a function. Since Impala allows this to be a column
name
and the Iceberg tests use this, the "user" function is treated as an Impala
function (found in ImpalaOperatorTable)
- For some optimizations, Iceberg requires the cast(my_string_timestamp as
timestamp)
function to be folded into a TimestampLiteral.
- These changes were mostly made in RexLiteralConverter and RexCallConverter
- In order to handle the predicate pushdowns, additional infrastructure needed
to change for Calcite. All literal and function expressions needed to be
analyzed
after being converted. In order to handle this in one place, the code was
changed
in CreateExprVisitor and ImpalaAnalyticRel, both of which use the visitor
pattern
to create expressions. However, an issue came up with the visitor pattern
within
Calcite in that exceptions can not be passed through the visitor. In order to
handle
this, the exception is saved in a member variable. A RuntimeException is then
thrown,
and the exception is caught from the visitor caller. At this point, the member
variable is checked and rethrown as an AnalysisException.
- Because the analyze function is called for all converted expressions, the
IntervalExpr
needed an implementation of a couple of methods. This is a dummy Expr class
which
did not need analysis before because it is immediately thrown away. It is still
thrown away, but dummy analyze methods are now added.
- A small issue was found in filter simplification while debugging. If there is
a filter condition with a "false" condition, the Filter RelNode can be converted
to an empty Values RelNode. This code is within ImpalaFilterSimplifyRule
- Various tests have been overridden in the e2e test files. The reasons are
documented in the test file. There is one edge case where an Iceberg
optimization
could not be utilized. If the filter clause is on a partitioned column
"where bool_col = true", Calcite simplifies this to "where bool_col" and this
predicate is not optimized by Iceberg. A bug has been filed for this to ensure
this expression can be optimized by both the original Iceberg planner and the
Calcite Iceberg planner.
This commit does not support all Iceberg queries. It focuses on the general
Iceberg queries and various optimizations that are tested. Among Iceberg queries
not supported (and this may not be a complete list) are queries that have:
metadata columns, complex columns, table sampling, time travel, lineage.
Testing: While there is a lot of code here, only one new test has been added to
calcite.test. This test is to ensure that an Iceberg table can be run through
Calcite. Unfortunately, in its current state, there is no differentiation
between
queries run through Calcite, and queries that fail at compilation time and
fallback to the original planner. The only exceptions are tests that are
overridden with CALCITE_PLANNER_RESULTS, so there is at least some coverage
there. Eventually, soon, there will be testing when there is no fallback
planner so
that all Iceberg tests will be run and verified through the Calcite planner.
Change-Id: I1854012b1caac63ced292b338d40074db950b42d
---
M fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
M fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
M fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
M fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
M fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
M fe/src/main/java/org/apache/impala/planner/ScanNodeHelper.java
M fe/src/main/java/org/apache/impala/planner/ScanNodeHelperImpl.java
M fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
M fe/src/main/java/org/apache/impala/util/IcebergUtil.java
M
java/calcite-planner/src/main/java/org/apache/impala/calcite/functions/IntervalExpr.java
M
java/calcite-planner/src/main/java/org/apache/impala/calcite/functions/RexCallConverter.java
M
java/calcite-planner/src/main/java/org/apache/impala/calcite/functions/RexLiteralConverter.java
M
java/calcite-planner/src/main/java/org/apache/impala/calcite/operators/ImpalaOperatorTable.java
M
java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaAggRel.java
M
java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaAnalyticRel.java
M
java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaHdfsScanRel.java
M
java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaJoinRel.java
M
java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaProjectRel.java
M
java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaSortRel.java
M
java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaValuesRel.java
M
java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/NodeCreationUtils.java
M
java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/NodeWithExprs.java
M
java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/phys/ImpalaHdfsScanNode.java
M
java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/util/CreateExprVisitor.java
M
java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/util/PrunedPartitionHelper.java
A
java/calcite-planner/src/main/java/org/apache/impala/calcite/rules/IcebergCountStarOptimizer.java
M
java/calcite-planner/src/main/java/org/apache/impala/calcite/rules/ImpalaFilterSimplifyRule.java
M
java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteDb.java
A
java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteIcebergTable.java
M
java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteTable.java
M
java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteOptimizer.java
M
java/calcite-planner/src/main/java/org/apache/impala/calcite/service/UnsupportedChecker.java
M testdata/workloads/functional-query/queries/QueryTest/calcite.test
M
testdata/workloads/functional-query/queries/QueryTest/iceberg-compound-predicate-push-down.test
M
testdata/workloads/functional-query/queries/QueryTest/iceberg-is-null-predicate-push-down.test
M
testdata/workloads/functional-query/queries/QueryTest/iceberg-partition-runtime-filter.test
M
testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert-default.test
M
testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert-v1.test
M
testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert-v2.test
M
testdata/workloads/functional-query/queries/QueryTest/iceberg-plain-count-star-optimization.test
M
testdata/workloads/functional-query/queries/QueryTest/iceberg-scan-metrics-basic.test
M
testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-count-star-optimization-in-complex-query.test
M
testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-plain-count-star-optimization.test
M
testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test
M testdata/workloads/functional-query/queries/QueryTest/min_max_filters.test
M
testdata/workloads/functional-query/queries/QueryTest/no-block-locations-hdfs-only.test
M testdata/workloads/functional-query/queries/QueryTest/no-block-locations.test
47 files changed, 847 insertions(+), 154 deletions(-)
git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/11/24311/11
--
To view, visit http://gerrit.cloudera.org:8080/24311
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I1854012b1caac63ced292b338d40074db950b42d
Gerrit-Change-Number: 24311
Gerrit-PatchSet: 11
Gerrit-Owner: Steve Carlin <[email protected]>
Gerrit-Reviewer: Aman Sinha <[email protected]>
Gerrit-Reviewer: Csaba Ringhofer <[email protected]>
Gerrit-Reviewer: Impala Public Jenkins <[email protected]>
Gerrit-Reviewer: Joe McDonnell <[email protected]>
Gerrit-Reviewer: Michael Smith <[email protected]>
Gerrit-Reviewer: Noemi Pap-Takacs <[email protected]>
Gerrit-Reviewer: Peter Rozsa <[email protected]>
Gerrit-Reviewer: Steve Carlin <[email protected]>
Gerrit-Reviewer: Zoltan Borok-Nagy <[email protected]>