[GitHub] [incubator-iceberg] aokolnychyi commented on a change in pull request #351: Provide an API to modify records within files
aokolnychyi commented on a change in pull request #351: Provide an API to modify records within files URL: https://github.com/apache/incubator-iceberg/pull/351#discussion_r311756153 ## File path: core/src/main/java/org/apache/iceberg/DefaultModifyFiles.java ## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Evaluator; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Projections; + +public class DefaultModifyFiles extends MergingSnapshotProducer implements ModifyFiles { + + private Long baseSnapshotId; + private Expression rowFilter; + + DefaultModifyFiles(TableOperations ops, Long baseSnapshotId) { +super(ops); +this.baseSnapshotId = baseSnapshotId; +// modify files must fail if any of the deleted paths is missing and cannot be deleted +failMissingDeletePaths(); + } + + @Override + protected ModifyFiles self() { +return this; + } + + @Override + protected String operation() { +return DataOperations.MODIFY; + } + + @Override + public ModifyFiles modifyFiles(Set filesToDelete, Set filesToAdd) { +Preconditions.checkArgument(filesToDelete != null, "Files to delete cannot be null"); +Preconditions.checkArgument(filesToAdd != null, "Files to add cannot be null"); + +for (DataFile toDelete : filesToDelete) { + delete(toDelete.path()); +} + +for (DataFile toAdd : filesToAdd) { + add(toAdd); +} + +return this; + } + + @Override + public ModifyFiles failOnNewFiles(Expression newRowFilter) { +Preconditions.checkArgument(newRowFilter != null, "Row filter cannot be null"); +this.rowFilter = newRowFilter; +return this; + } + + @Override + public List apply(TableMetadata base) { +if (rowFilter != null) { + PartitionSpec spec = writeSpec(); + Expression inclusiveExpr = Projections.inclusive(spec).project(rowFilter); + Evaluator inclusive = new Evaluator(spec.partitionType(), inclusiveExpr); + + List newFiles = collectNewFiles(base); + for (DataFile newFile : newFiles) { +// we do partition-level conflict resolution right now +// we can enhance it by leveraging column stats and MetricsEvaluator +ValidationException.check(!inclusive.eval(newFile.partition()), +"Modify operation detected a new file %s that might match %s", newFile.path(), rowFilter); + } +} + +return super.apply(base); + } + + private List collectNewFiles(TableMetadata meta) { +List newFiles = new ArrayList<>(); + +Long currentSnapshotId = meta.currentSnapshot() == null ? null : meta.currentSnapshot().snapshotId(); +while (currentSnapshotId != null && !currentSnapshotId.equals(baseSnapshotId)) { + Snapshot currentSnapshot = meta.snapshot(currentSnapshotId); + + if (currentSnapshot == null) { +throw new ValidationException( +"Modify operation cannot find snapshot %d. Was it expired?", currentSnapshotId); + } + + Iterables.addAll(newFiles, currentSnapshot.addedFiles()); Review comment: I believe we should fail the update if the file we are to delete is no longer there. Is [this](https://github.com/apache/incubator-iceberg/pull/351/files#diff-96ff93512d7be21e69f35e2bc96f03e9R316) use case what you mean? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] aokolnychyi commented on a change in pull request #351: Provide an API to modify records within files
aokolnychyi commented on a change in pull request #351: Provide an API to modify records within files URL: https://github.com/apache/incubator-iceberg/pull/351#discussion_r311501300 ## File path: core/src/main/java/org/apache/iceberg/DefaultModifyFiles.java ## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Evaluator; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Projections; + +public class DefaultModifyFiles extends MergingSnapshotProducer implements ModifyFiles { + + private Long baseSnapshotId; + private Expression rowFilter; + + DefaultModifyFiles(TableOperations ops, Long baseSnapshotId) { +super(ops); +this.baseSnapshotId = baseSnapshotId; +// modify files must fail if any of the deleted paths is missing and cannot be deleted +failMissingDeletePaths(); + } + + @Override + protected ModifyFiles self() { +return this; + } + + @Override + protected String operation() { +return DataOperations.MODIFY; + } + + @Override + public ModifyFiles modifyFiles(Set filesToDelete, Set filesToAdd) { +Preconditions.checkArgument(filesToDelete != null, "Files to delete cannot be null"); +Preconditions.checkArgument(filesToAdd != null, "Files to add cannot be null"); + +for (DataFile toDelete : filesToDelete) { + delete(toDelete.path()); +} + +for (DataFile toAdd : filesToAdd) { + add(toAdd); +} + +return this; + } + + @Override + public ModifyFiles failOnNewFiles(Expression newRowFilter) { +Preconditions.checkArgument(newRowFilter != null, "Row filter cannot be null"); +this.rowFilter = newRowFilter; +return this; + } + + @Override + public List apply(TableMetadata base) { +if (rowFilter != null) { + PartitionSpec spec = writeSpec(); + Expression inclusiveExpr = Projections.inclusive(spec).project(rowFilter); + Evaluator inclusive = new Evaluator(spec.partitionType(), inclusiveExpr); + + List newFiles = collectNewFiles(base); + for (DataFile newFile : newFiles) { +// we do partition-level conflict resolution right now +// we can enhance it by leveraging column stats and MetricsEvaluator +ValidationException.check(!inclusive.eval(newFile.partition()), +"Modify operation detected a new file %s that might match %s", newFile.path(), rowFilter); + } +} + +return super.apply(base); + } + + private List collectNewFiles(TableMetadata meta) { +List newFiles = new ArrayList<>(); + +Long currentSnapshotId = meta.currentSnapshot() == null ? null : meta.currentSnapshot().snapshotId(); +while (currentSnapshotId != null && !currentSnapshotId.equals(baseSnapshotId)) { + Snapshot currentSnapshot = meta.snapshot(currentSnapshotId); + + if (currentSnapshot == null) { +throw new ValidationException( +"Modify operation cannot find snapshot %d. Was it expired?", currentSnapshotId); + } + + Iterables.addAll(newFiles, currentSnapshot.addedFiles()); Review comment: I'll do that in a follow-up PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] aokolnychyi commented on a change in pull request #351: Provide an API to modify records within files
aokolnychyi commented on a change in pull request #351: Provide an API to modify records within files URL: https://github.com/apache/incubator-iceberg/pull/351#discussion_r310806685 ## File path: core/src/main/java/org/apache/iceberg/DefaultModifyFiles.java ## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Evaluator; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Projections; + +public class DefaultModifyFiles extends MergingSnapshotProducer implements ModifyFiles { + + private Long baseSnapshotId; + private Expression rowFilter; + + DefaultModifyFiles(TableOperations ops, Long baseSnapshotId) { +super(ops); +this.baseSnapshotId = baseSnapshotId; +// modify files must fail if any of the deleted paths is missing and cannot be deleted +failMissingDeletePaths(); + } + + @Override + protected ModifyFiles self() { +return this; + } + + @Override + protected String operation() { +return DataOperations.MODIFY; + } + + @Override + public ModifyFiles modifyFiles(Set filesToDelete, Set filesToAdd) { +Preconditions.checkArgument(filesToDelete != null, "Files to delete cannot be null"); +Preconditions.checkArgument(filesToAdd != null, "Files to add cannot be null"); + +for (DataFile toDelete : filesToDelete) { + delete(toDelete.path()); +} + +for (DataFile toAdd : filesToAdd) { + add(toAdd); +} + +return this; + } + + @Override + public ModifyFiles failOnNewFiles(Expression newRowFilter) { +Preconditions.checkArgument(newRowFilter != null, "Row filter cannot be null"); +this.rowFilter = newRowFilter; +return this; + } + + @Override + public List apply(TableMetadata base) { +if (rowFilter != null) { + PartitionSpec spec = writeSpec(); + Expression inclusiveExpr = Projections.inclusive(spec).project(rowFilter); + Evaluator inclusive = new Evaluator(spec.partitionType(), inclusiveExpr); + + List newFiles = collectNewFiles(base); + for (DataFile newFile : newFiles) { +// we do partition-level conflict resolution right now +// we can enhance it by leveraging column stats and MetricsEvaluator +ValidationException.check(!inclusive.eval(newFile.partition()), +"Modify operation detected a new file %s that might match %s", newFile.path(), rowFilter); + } +} + +return super.apply(base); + } + + private List collectNewFiles(TableMetadata meta) { +List newFiles = new ArrayList<>(); + +Long currentSnapshotId = meta.currentSnapshot() == null ? null : meta.currentSnapshot().snapshotId(); +while (currentSnapshotId != null && !currentSnapshotId.equals(baseSnapshotId)) { + Snapshot currentSnapshot = meta.snapshot(currentSnapshotId); + + if (currentSnapshot == null) { +throw new ValidationException( +"Modify operation cannot find snapshot %d. Was it expired?", currentSnapshotId); + } + + Iterables.addAll(newFiles, currentSnapshot.addedFiles()); Review comment: `currentSnapshot.addedFiles()` doesn't return stats, so it will complicate stats-based conflict resolution. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] aokolnychyi commented on a change in pull request #351: Provide an API to modify records within files
aokolnychyi commented on a change in pull request #351: Provide an API to modify records within files URL: https://github.com/apache/incubator-iceberg/pull/351#discussion_r310802612 ## File path: api/src/main/java/org/apache/iceberg/ModifyFiles.java ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.Set; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Expression; + +/** + * API for modifying files in a table. + * + * This API accumulates file additions and deletions, produces a new {@link Snapshot} of the + * changes, and commits that snapshot as the current. + * + * When committing, these changes will be applied to the latest table snapshot. Commit conflicts + * will be resolved by applying the changes to the new latest snapshot and reattempting the commit. + * + * Similarly to {@link RewriteFiles}, if any of the deleted files are no longer in the latest snapshot + * when reattempting, the commit will throw a {@link ValidationException}. + */ +public interface ModifyFiles extends SnapshotUpdate { + /** + * Add a modify that replaces one set of files with another set that might contain different data. + * + * @param filesToDelete files that will be replaced (deleted), cannot be null + * @param filesToAddfiles that will be added, cannot be null + * @return this for method chaining + */ + ModifyFiles modifyFiles(Set filesToDelete, Set filesToAdd); + + /** + * Enables validation of new files that are added concurrently after the modify operation starts. + * + * If another concurrent operation commits a new file that might contain rows matching + * the rowFilter expression, the modify operation will detect this during retries and fail. + * Setting a correct rowFilter is required to maintain serializable isolation. + * + * @param rowFilter an expression on rows in the table + * @return this for method chaining + */ + ModifyFiles failOnNewFiles(Expression rowFilter); Review comment: If we don't set a correct row filter, the isolation level will be snapshot isolation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] aokolnychyi commented on a change in pull request #351: Provide an API to modify records within files
aokolnychyi commented on a change in pull request #351: Provide an API to modify records within files URL: https://github.com/apache/incubator-iceberg/pull/351#discussion_r310801433 ## File path: core/src/main/java/org/apache/iceberg/BaseTransaction.java ## @@ -158,6 +158,16 @@ public OverwriteFiles newOverwrite() { return overwrite; } + @Override + public ModifyFiles newModify() { +checkLastOperationCommitted("ModifyFiles"); +Long baseSnapshotId = currentId(base); Review comment: I am using the base snapshot id of the transaction here. I believe it should be safe even if the transaction retries as we still remember the original base snapshot id. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] aokolnychyi commented on a change in pull request #351: Provide an API to modify records within files
aokolnychyi commented on a change in pull request #351: Provide an API to modify records within files URL: https://github.com/apache/incubator-iceberg/pull/351#discussion_r310801254 ## File path: api/src/main/java/org/apache/iceberg/ModifyFiles.java ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.Set; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Expression; + +/** + * API for modifying files in a table. + * + * This API accumulates file additions and deletions, produces a new {@link Snapshot} of the + * changes, and commits that snapshot as the current. + * + * When committing, these changes will be applied to the latest table snapshot. Commit conflicts + * will be resolved by applying the changes to the new latest snapshot and reattempting the commit. + * + * Similarly to {@link RewriteFiles}, if any of the deleted files are no longer in the latest snapshot + * when reattempting, the commit will throw a {@link ValidationException}. + */ +public interface ModifyFiles extends SnapshotUpdate { + /** + * Add a modify that replaces one set of files with another set that might contain different data. + * + * @param filesToDelete files that will be replaced (deleted), cannot be null + * @param filesToAddfiles that will be added, cannot be null + * @return this for method chaining + */ + ModifyFiles modifyFiles(Set filesToDelete, Set filesToAdd); + + /** + * Enables validation of new files that are added concurrently after the modify operation starts. + * + * If another concurrent operation commits a new file that might contain rows matching + * the rowFilter expression, the modify operation will detect this during retries and fail. + * Setting a correct rowFilter is required to maintain serializable isolation. + * + * @param rowFilter an expression on rows in the table + * @return this for method chaining + */ + ModifyFiles failOnNewFiles(Expression rowFilter); Review comment: Any ideas on a better name are welcome This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] aokolnychyi commented on a change in pull request #351: Provide an API to modify records within files
aokolnychyi commented on a change in pull request #351: Provide an API to modify records within files URL: https://github.com/apache/incubator-iceberg/pull/351#discussion_r310801433 ## File path: core/src/main/java/org/apache/iceberg/BaseTransaction.java ## @@ -158,6 +158,16 @@ public OverwriteFiles newOverwrite() { return overwrite; } + @Override + public ModifyFiles newModify() { +checkLastOperationCommitted("ModifyFiles"); +Long baseSnapshotId = currentId(base); Review comment: I am using the base snapshot id of the transaction here This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org