[GitHub] [incubator-iceberg] aokolnychyi commented on a change in pull request #351: Extend Iceberg with a way to overwrite files for eager updates/deletes
aokolnychyi commented on a change in pull request #351: Extend Iceberg with a way to overwrite files for eager updates/deletes URL: https://github.com/apache/incubator-iceberg/pull/351#discussion_r317793406 ## File path: api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java ## @@ -52,7 +52,7 @@ private MetricsEvalVisitor visitor() { return visitors.get(); } - InclusiveMetricsEvaluator(Schema schema, Expression unbound) { + public InclusiveMetricsEvaluator(Schema schema, Expression unbound) { Review comment: Let's address this in #413 together with other places. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] aokolnychyi commented on a change in pull request #351: Extend Iceberg with a way to overwrite files for eager updates/deletes
aokolnychyi commented on a change in pull request #351: Extend Iceberg with a way to overwrite files for eager updates/deletes URL: https://github.com/apache/incubator-iceberg/pull/351#discussion_r317398734 ## File path: core/src/main/java/org/apache/iceberg/OverwriteData.java ## @@ -88,6 +119,42 @@ public OverwriteFiles validateAddedFiles() { } } +if (conflictDetectionFilter != null) { + PartitionSpec spec = writeSpec(); + Expression inclusiveExpr = Projections.inclusive(spec).project(conflictDetectionFilter); + Evaluator inclusive = new Evaluator(spec.partitionType(), inclusiveExpr); + + InclusiveMetricsEvaluator metrics = new InclusiveMetricsEvaluator(base.schema(), conflictDetectionFilter); + + List newFiles = collectNewFiles(base); + for (DataFile newFile : newFiles) { +ValidationException.check( +!inclusive.eval(newFile.partition()) || !metrics.eval(newFile), +"A conflicting file was appended that matches filter '%s': %s", Review comment: You are right, that's exactly the behavior we wanted to achieve. If the file can potentially contain data that matches the filter, we should fail. I should update the error message. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] aokolnychyi commented on a change in pull request #351: Extend Iceberg with a way to overwrite files for eager updates/deletes
aokolnychyi commented on a change in pull request #351: Extend Iceberg with a way to overwrite files for eager updates/deletes URL: https://github.com/apache/incubator-iceberg/pull/351#discussion_r317398659 ## File path: core/src/main/java/org/apache/iceberg/OverwriteData.java ## @@ -88,6 +119,42 @@ public OverwriteFiles validateAddedFiles() { } } +if (conflictDetectionFilter != null) { + PartitionSpec spec = writeSpec(); + Expression inclusiveExpr = Projections.inclusive(spec).project(conflictDetectionFilter); + Evaluator inclusive = new Evaluator(spec.partitionType(), inclusiveExpr); + + InclusiveMetricsEvaluator metrics = new InclusiveMetricsEvaluator(base.schema(), conflictDetectionFilter); + + List newFiles = collectNewFiles(base); + for (DataFile newFile : newFiles) { +ValidationException.check( +!inclusive.eval(newFile.partition()) || !metrics.eval(newFile), +"A conflicting file was appended that matches filter '%s': %s", +conflictDetectionFilter, newFile.path()); + } +} + return super.apply(base); } + + private List collectNewFiles(TableMetadata meta) { +List newFiles = new ArrayList<>(); + +Long currentSnapshotId = meta.currentSnapshot() == null ? null : meta.currentSnapshot().snapshotId(); Review comment: Not sure I got. If the current snapshot is null, it means the table is still empty and shouldn't have any new files, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] aokolnychyi commented on a change in pull request #351: Extend Iceberg with a way to overwrite files for eager updates/deletes
aokolnychyi commented on a change in pull request #351: Extend Iceberg with a way to overwrite files for eager updates/deletes URL: https://github.com/apache/incubator-iceberg/pull/351#discussion_r317398607 ## File path: core/src/main/java/org/apache/iceberg/OverwriteData.java ## @@ -88,6 +119,42 @@ public OverwriteFiles validateAddedFiles() { } } +if (conflictDetectionFilter != null) { + PartitionSpec spec = writeSpec(); + Expression inclusiveExpr = Projections.inclusive(spec).project(conflictDetectionFilter); + Evaluator inclusive = new Evaluator(spec.partitionType(), inclusiveExpr); + + InclusiveMetricsEvaluator metrics = new InclusiveMetricsEvaluator(base.schema(), conflictDetectionFilter); + + List newFiles = collectNewFiles(base); + for (DataFile newFile : newFiles) { +ValidationException.check( +!inclusive.eval(newFile.partition()) || !metrics.eval(newFile), +"A conflicting file was appended that matches filter '%s': %s", +conflictDetectionFilter, newFile.path()); + } +} + return super.apply(base); } + + private List collectNewFiles(TableMetadata meta) { +List newFiles = new ArrayList<>(); + +Long currentSnapshotId = meta.currentSnapshot() == null ? null : meta.currentSnapshot().snapshotId(); +while (currentSnapshotId != null && !currentSnapshotId.equals(readSnapshotId)) { Review comment: This part is only invoked if we call `validateNoConflictingAppends`, which always accepts a valid `readSnapshotId` and an expression for detecting conflicts. If `validateNoConflictingAppends` is not called, there will be no validation and the behavior would be as it was before this change. We will call `validateNoConflictingAppends(null, expr)` only if we start with an empty table, meaning that we have to scan the entire history. We can consider setting `readSnapshotId` and `conflictDetectionFilter` independently, but I am not sure whether it gives us any benefits. Also, it will lead to correctness errors if we won't set `readSnapshotId` in transactions correctly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] aokolnychyi commented on a change in pull request #351: Extend Iceberg with a way to overwrite files for eager updates/deletes
aokolnychyi commented on a change in pull request #351: Extend Iceberg with a way to overwrite files for eager updates/deletes URL: https://github.com/apache/incubator-iceberg/pull/351#discussion_r315093153 ## File path: api/src/main/java/org/apache/iceberg/OverwriteFiles.java ## @@ -74,5 +85,23 @@ * * @return this for method chaining */ - OverwriteFiles validateAddedFiles(); + OverwriteFiles validateAddedFilesMatchOverwriteFilter(); + + /** + * Enables validation that files added concurrently do not conflict with this commit's operation. + * + * This method should be called when the table is queried to determine which files to delete/append. + * If a concurrent operation commits a new file after the data was read and that file might + * contain rows matching the specified conflict detection filter, the overwrite operation + * will detect this during retries and fail. + * + * Calling this method with a correct conflict detection filter is required to maintain Review comment: We can also consider making the isolation level a table property and validating it here. I can think of multiple operations where this will be applicable: compaction, update, delete, merge into. However, I am not sure we want the same isolation level for all operations. Compaction is not serializable in its current form: we don't check files added concurrently (which seems correct to me). Maybe, the isolation level shouldn't be a table property but rather a config of the operation itself. That way, compaction will be using snapshot isolation and update/deletes might be serializable. @rdblue what are your thoughts? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] aokolnychyi commented on a change in pull request #351: Extend Iceberg with a way to overwrite files for eager updates/deletes
aokolnychyi commented on a change in pull request #351: Extend Iceberg with a way to overwrite files for eager updates/deletes URL: https://github.com/apache/incubator-iceberg/pull/351#discussion_r315093153 ## File path: api/src/main/java/org/apache/iceberg/OverwriteFiles.java ## @@ -74,5 +85,23 @@ * * @return this for method chaining */ - OverwriteFiles validateAddedFiles(); + OverwriteFiles validateAddedFilesMatchOverwriteFilter(); + + /** + * Enables validation that files added concurrently do not conflict with this commit's operation. + * + * This method should be called when the table is queried to determine which files to delete/append. + * If a concurrent operation commits a new file after the data was read and that file might + * contain rows matching the specified conflict detection filter, the overwrite operation + * will detect this during retries and fail. + * + * Calling this method with a correct conflict detection filter is required to maintain Review comment: We can also consider making the isolation level a table property and validating it here. I can think of multiple operations where this will be applicable: compaction, update, delete, merge into. However, I am not sure we want the same isolation level for all operations. Compaction is not serializable in its current form: we don't check files added concurrently (which seems correct to me). Maybe, the isolation level shouldn't be a table property but rather a config of the operation itself. That way, compaction will be using snapshot isolation and update/deletes might require serializable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] aokolnychyi commented on a change in pull request #351: Extend Iceberg with a way to overwrite files for eager updates/deletes
aokolnychyi commented on a change in pull request #351: Extend Iceberg with a way to overwrite files for eager updates/deletes URL: https://github.com/apache/incubator-iceberg/pull/351#discussion_r315093153 ## File path: api/src/main/java/org/apache/iceberg/OverwriteFiles.java ## @@ -74,5 +85,23 @@ * * @return this for method chaining */ - OverwriteFiles validateAddedFiles(); + OverwriteFiles validateAddedFilesMatchOverwriteFilter(); + + /** + * Enables validation that files added concurrently do not conflict with this commit's operation. + * + * This method should be called when the table is queried to determine which files to delete/append. + * If a concurrent operation commits a new file after the data was read and that file might + * contain rows matching the specified conflict detection filter, the overwrite operation + * will detect this during retries and fail. + * + * Calling this method with a correct conflict detection filter is required to maintain Review comment: We can also consider making the isolation level a table property and validating it here. Maybe, in the future, we will have more operations where the isolation level can be lower to allow better performance. Having said that, if such a property exists, it has to be respected by all operations. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] aokolnychyi commented on a change in pull request #351: Extend Iceberg with a way to overwrite files for eager updates/deletes
aokolnychyi commented on a change in pull request #351: Extend Iceberg with a way to overwrite files for eager updates/deletes URL: https://github.com/apache/incubator-iceberg/pull/351#discussion_r315093153 ## File path: api/src/main/java/org/apache/iceberg/OverwriteFiles.java ## @@ -74,5 +85,23 @@ * * @return this for method chaining */ - OverwriteFiles validateAddedFiles(); + OverwriteFiles validateAddedFilesMatchOverwriteFilter(); + + /** + * Enables validation that files added concurrently do not conflict with this commit's operation. + * + * This method should be called when the table is queried to determine which files to delete/append. + * If a concurrent operation commits a new file after the data was read and that file might + * contain rows matching the specified conflict detection filter, the overwrite operation + * will detect this during retries and fail. + * + * Calling this method with a correct conflict detection filter is required to maintain Review comment: We can also consider making the isolation level a table property and validating it here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] aokolnychyi commented on a change in pull request #351: Extend Iceberg with a way to overwrite files for eager updates/deletes
aokolnychyi commented on a change in pull request #351: Extend Iceberg with a way to overwrite files for eager updates/deletes URL: https://github.com/apache/incubator-iceberg/pull/351#discussion_r314236321 ## File path: api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java ## @@ -52,7 +52,7 @@ private MetricsEvalVisitor visitor() { return visitors.get(); } - InclusiveMetricsEvaluator(Schema schema, Expression unbound) { + public InclusiveMetricsEvaluator(Schema schema, Expression unbound) { Review comment: TODO: this is temporary until we figure out case sensitivity for metrics evaluators. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] aokolnychyi commented on a change in pull request #351: Extend Iceberg with a way to overwrite files for eager updates/deletes
aokolnychyi commented on a change in pull request #351: Extend Iceberg with a way to overwrite files for eager updates/deletes URL: https://github.com/apache/incubator-iceberg/pull/351#discussion_r314236135 ## File path: api/src/main/java/org/apache/iceberg/OverwriteFiles.java ## @@ -24,18 +24,17 @@ import org.apache.iceberg.expressions.Projections; /** - * API for overwriting files in a table by filter expression. + * API for overwriting files in a table. * * This API accumulates file additions and produces a new {@link Snapshot} of the table by replacing - * all the files that match the filter expression with the set of additions. This operation is used - * to implement idempotent writes that always replace a section of a table with new data. + * all the deleted files with the set of additions. This operation is used to implement idempotent + * writes that always replace a section of a table with new data or update/delete operations that + * eagerly overwrite files. Review comment: Added this below. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] aokolnychyi commented on a change in pull request #351: Extend Iceberg with a way to overwrite files for eager updates/deletes
aokolnychyi commented on a change in pull request #351: Extend Iceberg with a way to overwrite files for eager updates/deletes URL: https://github.com/apache/incubator-iceberg/pull/351#discussion_r314234844 ## File path: api/src/main/java/org/apache/iceberg/OverwriteFiles.java ## @@ -74,5 +90,22 @@ * * @return this for method chaining */ - OverwriteFiles validateAddedFiles(); + OverwriteFiles validateAddedFilesMatchRowFilter(); + + /** + * Enables validation of files that are added concurrently. + * + * This method should be called when the table is queried to determine files to delete/append. + * If a concurrent operation commits a new file after the data was read and that file might + * contain rows matching the specified row filter, the overwrite operation will detect + * this during retries and fail. + * + * Calling this method with a correct row filter is required to maintain serializable isolation. + * Otherwise, the isolation level will be snapshot isolation. + * + * @param readSnapshotId the snapshot id that was used to read the data + * @param rowFilter an expression on rows in the table + * @return this for method chaining + */ + OverwriteFiles validateNoConflictingAppends(Long readSnapshotId, Expression rowFilter); Review comment: Allowing `null` was intentional to handle cases when the table is empty. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] aokolnychyi commented on a change in pull request #351: Extend Iceberg with a way to overwrite files for eager updates/deletes
aokolnychyi commented on a change in pull request #351: Extend Iceberg with a way to overwrite files for eager updates/deletes URL: https://github.com/apache/incubator-iceberg/pull/351#discussion_r314233769 ## File path: core/src/main/java/org/apache/iceberg/OverwriteData.java ## @@ -88,6 +121,40 @@ public OverwriteFiles validateAddedFiles() { } } +if (conflictingAppendsRowFilter != null) { + PartitionSpec spec = writeSpec(); + Expression inclusiveExpr = Projections.inclusive(spec).project(conflictingAppendsRowFilter); + Evaluator inclusive = new Evaluator(spec.partitionType(), inclusiveExpr); + + List newFiles = collectNewFiles(base); + for (DataFile newFile : newFiles) { +// we do partition-level conflict resolution right now +// we can enhance it by leveraging column stats and MetricsEvaluator +ValidationException.check( +!inclusive.eval(newFile.partition()), +"A conflicting file was appended that matches filter '%s': %s", +conflictingAppendsRowFilter, newFile.path()); + } +} + return super.apply(base); } + + private List collectNewFiles(TableMetadata meta) { +List newFiles = new ArrayList<>(); + +Long currentSnapshotId = meta.currentSnapshot() == null ? null : meta.currentSnapshot().snapshotId(); +while (currentSnapshotId != null && !currentSnapshotId.equals(readSnapshotId)) { + Snapshot currentSnapshot = meta.snapshot(currentSnapshotId); + + if (currentSnapshot == null) { +throw new ValidationException("Cannot find snapshot %d. Was it expired?", currentSnapshotId); Review comment: I think `meta.currentSnapshot` can be `null` if the overwrite happens on an empty table. In that case, the read snapshot is also `null`, so it should work correctly. We have a test for this as well. Such a situation might happen if we use this API for eager `MERGE INTO` operations. Will update the error message. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org