Zoltan Borok-Nagy has posted comments on this change. ( http://gerrit.cloudera.org:8080/20677 )
Change subject: IMPALA-12313: (part 2) Limited UPDATE support for Iceberg tables ...................................................................... Patch Set 4: (55 comments) Thanks for the comments! http://gerrit.cloudera.org:8080/#/c/20677/3//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/20677/3//COMMIT_MSG@10 PS3, Line 10: any of : the following is t > Nit: "any of the following is". Done http://gerrit.cloudera.org:8080/#/c/20677/3//COMMIT_MSG@46 PS3, Line 46: Insert sink (i, s, 5) > Just an idea: How difficult / expensive would it be to check the rows we up That's a good idea. In simple cases this might be doable by adding extra predicates: E.g.: UPDATE tbl SET k = 3 WHERE i > 4; ==> UPDATE tbl SET k = 3 WHERE i > 4 AND k != 3; Other than these planner-side extra predicates, I wouldn't complicate the backend executors. But yeah, let's think more about this and do it separately. Filed IMPALA-12588. http://gerrit.cloudera.org:8080/#/c/20677/3//COMMIT_MSG@66 PS3, Line 66: Why does this patc > Nit: "Why does this patch have..." would be better. Done http://gerrit.cloudera.org:8080/#/c/20677/3//COMMIT_MSG@80 PS3, Line 80: smaller problem : then inefficient data files > Why is it a smaller problem? Is it because we expect that there is more dat Yes, I was thinking about having as few data files as possible and having the data in a good order, so you get good encoding and page filtering capabilities. I don't think the other way around, i.e. as few delete files as possible but more data files than necessary, would be beneficial. http://gerrit.cloudera.org:8080/#/c/20677/3/be/src/exec/iceberg-delete-sink.cc File be/src/exec/iceberg-delete-sink.cc: http://gerrit.cloudera.org:8080/#/c/20677/3/be/src/exec/iceberg-delete-sink.cc@117 PS3, Line 117: .size(), 2); > Not modified in this change, but does this variable do anything? Removed. http://gerrit.cloudera.org:8080/#/c/20677/3/be/src/exec/iceberg-delete-sink.cc@137 PS3, Line 137: "If thi > Does 'dml_exec_state_' need to be the same as 'state->dml_exec_state()', wh No, dml_exec_state_ is the delete sink's own object. When we are doing the UPDATE and having two sinks (insert sink, delete sink), it's not possible to use the same dml_exec_state object simultaneously without violating some preconditions. So IcebergDeleteSink now has its own which is merged into state->dml_exec_state() in Close(). Added a comment about this to make it clear. http://gerrit.cloudera.org:8080/#/c/20677/3/be/src/exec/multi-table-sink.h File be/src/exec/multi-table-sink.h: http://gerrit.cloudera.org:8080/#/c/20677/3/be/src/exec/multi-table-sink.h@69 PS3, Line 69: /// END: Methods above just delegate calls to the child sinks in 'table_sinks_'. > I'd consider writing the explanatory comment of BEGIN also here ("Following Done http://gerrit.cloudera.org:8080/#/c/20677/3/be/src/exec/multi-table-sink.cc File be/src/exec/multi-table-sink.cc: http://gerrit.cloudera.org:8080/#/c/20677/3/be/src/exec/multi-table-sink.cc@52 PS3, Line 52: > Instead of a C-style cast we could use static_cast, maybe with a DCHECK con Since this is not hot code I'm using dynamic_cast and DCHECK_NOTNULL. http://gerrit.cloudera.org:8080/#/c/20677/3/be/src/exec/multi-table-sink.cc@94 PS3, Line 94: DataSink::Close(state); > Shouldn't DataSink::Close() already set 'closed_' to true? We could also re Done http://gerrit.cloudera.org:8080/#/c/20677/3/be/src/exec/table-sink-base.cc File be/src/exec/table-sink-base.cc: http://gerrit.cloudera.org:8080/#/c/20677/3/be/src/exec/table-sink-base.cc@419 PS3, Line 419: if (dml_exec_state == nullptr) dml_exec_state = state->dml_exec_state(); > When is it possible that 'dml_exec_state' is not null and is not the same a By default we are using state->dml_exec_state(), unless the caller provides an explicit dml_exec_state. Currently the Iceberg Delete Sink provides explicit dml exec state because it has its own. But I don't want to make assumptions here like 'is_delete' <==> 'dml_exec_state != nullptr', because they are independent parameters, the connection between them might be just temporary. In fact, in part 3 the IcebergDeleteSink might lose its own dml_exec_state as UPDATEs will use a different delete sink. http://gerrit.cloudera.org:8080/#/c/20677/3/be/src/exec/table-sink-base.cc@430 PS3, Line 430: > If 'is_delete' is true, shouldn't this always be true also? We could add a Done http://gerrit.cloudera.org:8080/#/c/20677/3/be/src/runtime/dml-exec-state.cc File be/src/runtime/dml-exec-state.cc: http://gerrit.cloudera.org:8080/#/c/20677/3/be/src/runtime/dml-exec-state.cc@483 PS3, Line 483: et_n > What does ASDF mean? ASDF does not mean anything, it's just easy to type and search for in log messages during debugging. Removed. http://gerrit.cloudera.org:8080/#/c/20677/3/be/src/runtime/dml-exec-state.cc@565 PS3, Line 565: > Shouldn't 'is_iceberg' always be true for delete files? Yes, removed this parameter. http://gerrit.cloudera.org:8080/#/c/20677/3/common/protobuf/control_service.proto File common/protobuf/control_service.proto: http://gerrit.cloudera.org:8080/#/c/20677/3/common/protobuf/control_service.proto@95 PS3, Line 95: 7 > Did you intentionally skip 7? No, fixed it. http://gerrit.cloudera.org:8080/#/c/20677/3/common/thrift/DataSinks.thrift File common/thrift/DataSinks.thrift: http://gerrit.cloudera.org:8080/#/c/20677/3/common/thrift/DataSinks.thrift@187 PS3, Line 187: > Maybe 'child_data_sinks' would be better. Done http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java File fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java: http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java@65 PS3, Line 65: // 0 is reserved for table sinks > Shouldn't this come directly after 'targetTable_'? Done http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java File fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java: http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java@20 PS3, Line 20: import java.util.List; : > Unused imports. Done http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java@23 PS3, Line 23: import org.apache.impala.common.AnalysisException; : import org.apache.impala.common.Pair; : import org.apache.impala.planner.Da > Unused imports. Done http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java@27 PS3, Line 27: import org.apache.impala.planner.TableSink; > Unused import. Done http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java File fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java: http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java@36 PS3, Line 36: buildAndValidate > I can't find the 'createSourceStmt()' that sets these variables. Aren't the Done http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java@53 PS3, Line 53: super(modifyStmt); > This comment seems stale, the following line was deleted. Done http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java File fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java: http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@48 PS3, Line 48: private int deleteTableId_ = -1; > Optional: could add a comment that it is set in 'analyze()'. Done http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@51 PS3, Line 51: buildAndValidate > I can't find the 'createSourceStmt()' that sets these variables. Aren't the Done http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@75 PS3, Line 75: update > "update mode"? Done http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@91 PS3, Line 91: sorte > I don't think "being" is needed here. Done http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@123 PS3, Line 123: c, originalTargetTable_.getDefaultPartitionSpec())) { > This error message is incorrect, it should be about 'c' being a complex typ Removed this whole check as we don't allow updating tables with complex types at all. http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java File fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java: http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java@40 PS3, Line 40: /////// > I can't find the 'createSourceStmt()' that sets these variables. Aren't the Done http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java@51 PS3, Line 51: i'th position in t > Is this set anywhere? No, initially I introduced it for Iceberg, it got duplicated during refactorings. Not relevant for Kudu. http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java@110 PS3, Line 110: Column c = lhsSlotRef.getResolvedPath().destColumn(); > Maybe we should open a Jira for this? Even if Kudu supports nested types in I just moved around this code and TODO message. I think we will need to deal with this when we want to write complex types, and Kudu supports them. I don't it will be relevant in the near future. There are complex types in Iceberg as well, Impala can read them, and we already throw an error if the user wants to write such a table, and have tests for that. http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java File fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java: http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java@69 PS3, Line 69: yStmt; > I can't see that it is set in createSourceStmt(), or anywhere else in this Done http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java@101 PS3, Line 101: } > Is 'sourceStmt_.analyze(analyzer)' moved here because 'source Stmt_' should Yes, otherwise we get an exception. http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java@124 PS3, Line 124: > Superfluous spaces, we have an indentation of 4 before continuation lines. Done http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java File fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java: http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java@47 PS3, Line 47: bleI > It is set in the constructor, I don't think it needs to be set here too. Done http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/planner/MultiDataSink.java File fe/src/main/java/org/apache/impala/planner/MultiDataSink.java: http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/planner/MultiDataSink.java@35 PS3, Line 35: * subset of columns than other data sinks. One example is Iceberg UPDATEs, which > We could mention UPDATE as an example. Done http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/planner/MultiDataSink.java@41 PS3, Line 41: f 0 means no limi > If it is the max number of MULTI sinks, the name should be 'maxNumberOfMult Done http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/planner/MultiDataSink.java@50 PS3, Line 50: /** > We could add a comment that this should be called after all child sinks hav Done http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/planner/MultiDataSink.java@129 PS3, Line 129: /** > Shouldn't we mention 'maxNumberOfSinks_' here? Or is it connected to MAX_HD It's actually MAX_FS_WRITERS query option (updated), and yes, maxNumberOfSinks_ is set to its value. http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/planner/MultiDataSink.java@134 PS3, Line 134: public int getNumNodes() { > "there are more nodes than instances" - can it only happen if 'maxNumberOfS I think this is only relevant if we want to limit the number of nodes/instances by MAX_FS_WRITERS. Though this code was just copied from the DELETE/INSERT sink, and currently it doesn't work for neither DELETE/UPDATE. Let me care about hese in a separate Jira (IMPALA-12587) as this CR is already huge, and it is not too severe, as we always shuffle rows around based on partition values, therefore the end result, i.e. the number of files written shouldn't be affected in most cases. http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/planner/MultiDataSink.java@143 PS3, Line 143: > Shouldn't we mention 'maxNumberOfSinks_' here? Or is it connected to MAX_HD See above. http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/service/Frontend.java File fe/src/main/java/org/apache/impala/service/Frontend.java: http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/service/Frontend.java@2613 PS3, Line 2613: } > From this line the function is the same as the corresponding part of addFin I extracted the creation of TIcebergDmlFinalizeParams, left the last two statements so it is easier to follow the construction of these objects. http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/util/IcebergUtil.java File fe/src/main/java/org/apache/impala/util/IcebergUtil.java: http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/util/IcebergUtil.java@1134 PS3, Line 1134: if (!icebergPartSpec.hasPartitionFields()) return; > Nit: 'tableName' could come after 1135. Done http://gerrit.cloudera.org:8080/#/c/20677/3/fe/src/main/java/org/apache/impala/util/IcebergUtil.java@1155 PS3, Line 1155: getIcebergPartitionTransformExp > The two 'getIcebergPartitionTransformExpr()' functions are only used here. Done http://gerrit.cloudera.org:8080/#/c/20677/3/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test File testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test: http://gerrit.cloudera.org:8080/#/c/20677/3/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test@1 PS3, Line 1: UPDATE iceberg_v2_no_deletes set s = concat(s,s) where i = 3 > Shouldn't we also check that the updates have really been applied? Or is th In these .test files we can only test the plans. Also, they are not being executed. http://gerrit.cloudera.org:8080/#/c/20677/3/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test File testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test: http://gerrit.cloudera.org:8080/#/c/20677/3/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test@176 PS3, Line 176: update ice_alltypes set bigint_col = 33, int_col = 3, string_col = 'three'; > Is this query intentionally duplicated? It wasn't intentional, but I'm thinking about keeping it anyway, especially if we want to optimize the UPDATE, so it won't modify anything here (RUNTIME_PROFILE values 0s). http://gerrit.cloudera.org:8080/#/c/20677/3/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test@200 PS3, Line 200: update ice_alltypes set bigint_col=bi, string_col=s, date_col=d from ice_alltypes, ref_table where int_col = i; > What would/should happen if in the join, for a row in ice_alltypes, there w Good catch. It is currently problematic with both Kudu/Iceberg. Kudu randomly updates the value (filed IMPALA-12586). For Iceberg, our current implementation duplicates the records. I was thinking about dropping the UPDATE FROM syntax for Iceberg tables, because it is non-standard anyway. Hive UPDATE also doesn't have a FROM clause: https://cwiki.apache.org/confluence/display/hive/languagemanual+dml#LanguageManualDML-Update It was also problematic in Microsoft SQL Server: https://sqlserverfast.com/blog/hugo/2008/03/lets-deprecate-update-from/ Then I realized we could check the duplicated values in IcebergDeleteSink. Now it is safer than Kudu's UPDATE FROM. Since we don't have a MERGE statement yet, the UPDATE FROM is very valuable, as this is a half MERGE stmt. We can safely check the duplicates as long as the duplicated rows are transferred to the same IcebergDeleteSink operator. Currently we are shuffling the rows around based on original partition spec id and original partition values, so this will be true (we shuffle based on original, unchanged data). In part 3 when we will allow modifying partition columns, and we will shuffle rows around based on new partition spec and new partition values, we will still be OK in most of the cases, except one: updating partition column AND right side is non-constant value AND we have a FROM clause. For this edge case we can just throw an error during analysis. http://gerrit.cloudera.org:8080/#/c/20677/3/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test@270 PS3, Line 270: AMENODE/test-wareho > Nit: this could be 'ice_time_transforms_t[ime]s[tamp]' to have the same log Done http://gerrit.cloudera.org:8080/#/c/20677/3/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test File testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test: http://gerrit.cloudera.org:8080/#/c/20677/3/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test@750 PS3, Line 750: Deletes > This is a delete query, not an update. I followed the above pattern (see INSERT/TRUNCATE/COMPUTE STATS). Changed the new code, was thinking about changing the old comments, but that would mess up git blame for little gain. http://gerrit.cloudera.org:8080/#/c/20677/3/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking_and_row_filtering.test File testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking_and_row_filtering.test: http://gerrit.cloudera.org:8080/#/c/20677/3/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking_and_row_filtering.test@189 PS3, Line 189: > This is a delete query, not an update. I followed the above pattern, anyway, changed the new comments. Also moved to ranger_row_filtering.test. http://gerrit.cloudera.org:8080/#/c/20677/3/tests/query_test/test_iceberg.py File tests/query_test/test_iceberg.py: http://gerrit.cloudera.org:8080/#/c/20677/3/tests/query_test/test_iceberg.py@1193 PS3, Line 1193: if > Instead of an IF, this could be "return vector.get_value('exec_option')['di It might be shorter, but the current version is more readable IMO, and more importantly it's easier to add new criteria to this form. http://gerrit.cloudera.org:8080/#/c/20677/4/tests/query_test/test_iceberg.py File tests/query_test/test_iceberg.py: http://gerrit.cloudera.org:8080/#/c/20677/4/tests/query_test/test_iceberg.py@1321 PS4, Line 1321: d > flake8: E999 SyntaxError: invalid syntax Done http://gerrit.cloudera.org:8080/#/c/20677/3/tests/stress/test_update_stress.py File tests/stress/test_update_stress.py: http://gerrit.cloudera.org:8080/#/c/20677/3/tests/stress/test_update_stress.py@96 PS3, Line 96: cols = > Nit could be 'updates_per_col'. Done http://gerrit.cloudera.org:8080/#/c/20677/3/tests/stress/test_update_stress.py@97 PS3, Line 97: = > This could be written as 'updates[_per_col]' * 3, where 3 is the number of Done http://gerrit.cloudera.org:8080/#/c/20677/4/tests/stress/test_update_stress.py File tests/stress/test_update_stress.py: http://gerrit.cloudera.org:8080/#/c/20677/4/tests/stress/test_update_stress.py@100 PS4, Line 100: u > flake8: F821 undefined name 'updates' Done http://gerrit.cloudera.org:8080/#/c/20677/4/tests/stress/test_update_stress.py@101 PS4, Line 101: u > flake8: F821 undefined name 'updates' Done http://gerrit.cloudera.org:8080/#/c/20677/4/tests/stress/test_update_stress.py@102 PS4, Line 102: u > flake8: F821 undefined name 'updates' Done -- To view, visit http://gerrit.cloudera.org:8080/20677 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Iff0ef6075a2b6ebe130d15daa389ac1a505a7a08 Gerrit-Change-Number: 20677 Gerrit-PatchSet: 4 Gerrit-Owner: Zoltan Borok-Nagy <[email protected]> Gerrit-Reviewer: Andrew Sherman <[email protected]> Gerrit-Reviewer: Daniel Becker <[email protected]> Gerrit-Reviewer: Gabor Kaszab <[email protected]> Gerrit-Reviewer: Impala Public Jenkins <[email protected]> Gerrit-Reviewer: Noemi Pap-Takacs <[email protected]> Gerrit-Reviewer: Tamas Mate <[email protected]> Gerrit-Reviewer: Zoltan Borok-Nagy <[email protected]> Gerrit-Comment-Date: Fri, 01 Dec 2023 14:54:53 +0000 Gerrit-HasComments: Yes
