[GitHub] [incubator-iceberg] aokolnychyi commented on a change in pull request #351: Provide an API to modify records within files

2019-08-07 Thread GitBox
aokolnychyi commented on a change in pull request #351: Provide an API to 
modify records within files
URL: https://github.com/apache/incubator-iceberg/pull/351#discussion_r311756153
 
 

 ##
 File path: core/src/main/java/org/apache/iceberg/DefaultModifyFiles.java
 ##
 @@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Projections;
+
+public class DefaultModifyFiles extends MergingSnapshotProducer 
implements ModifyFiles {
+
+  private Long baseSnapshotId;
+  private Expression rowFilter;
+
+  DefaultModifyFiles(TableOperations ops, Long baseSnapshotId) {
+super(ops);
+this.baseSnapshotId = baseSnapshotId;
+// modify files must fail if any of the deleted paths is missing and 
cannot be deleted
+failMissingDeletePaths();
+  }
+
+  @Override
+  protected ModifyFiles self() {
+return this;
+  }
+
+  @Override
+  protected String operation() {
+return DataOperations.MODIFY;
+  }
+
+  @Override
+  public ModifyFiles modifyFiles(Set filesToDelete, Set 
filesToAdd) {
+Preconditions.checkArgument(filesToDelete != null, "Files to delete cannot 
be null");
+Preconditions.checkArgument(filesToAdd != null, "Files to add cannot be 
null");
+
+for (DataFile toDelete : filesToDelete) {
+  delete(toDelete.path());
+}
+
+for (DataFile toAdd : filesToAdd) {
+  add(toAdd);
+}
+
+return this;
+  }
+
+  @Override
+  public ModifyFiles failOnNewFiles(Expression newRowFilter) {
+Preconditions.checkArgument(newRowFilter != null, "Row filter cannot be 
null");
+this.rowFilter = newRowFilter;
+return this;
+  }
+
+  @Override
+  public List apply(TableMetadata base) {
+if (rowFilter != null) {
+  PartitionSpec spec = writeSpec();
+  Expression inclusiveExpr = 
Projections.inclusive(spec).project(rowFilter);
+  Evaluator inclusive = new Evaluator(spec.partitionType(), inclusiveExpr);
+
+  List newFiles = collectNewFiles(base);
+  for (DataFile newFile : newFiles) {
+// we do partition-level conflict resolution right now
+// we can enhance it by leveraging column stats and MetricsEvaluator
+ValidationException.check(!inclusive.eval(newFile.partition()),
+"Modify operation detected a new file %s that might match %s", 
newFile.path(), rowFilter);
+  }
+}
+
+return super.apply(base);
+  }
+
+  private List collectNewFiles(TableMetadata meta) {
+List newFiles = new ArrayList<>();
+
+Long currentSnapshotId = meta.currentSnapshot() == null ? null : 
meta.currentSnapshot().snapshotId();
+while (currentSnapshotId != null && 
!currentSnapshotId.equals(baseSnapshotId)) {
+  Snapshot currentSnapshot = meta.snapshot(currentSnapshotId);
+
+  if (currentSnapshot == null) {
+throw new ValidationException(
+"Modify operation cannot find snapshot %d. Was it expired?", 
currentSnapshotId);
+  }
+
+  Iterables.addAll(newFiles, currentSnapshot.addedFiles());
 
 Review comment:
   I believe we should fail the update if the file we are to delete is no 
longer there. Is 
[this](https://github.com/apache/incubator-iceberg/pull/351/files#diff-96ff93512d7be21e69f35e2bc96f03e9R316)
 use case what you mean?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] aokolnychyi commented on a change in pull request #351: Provide an API to modify records within files

2019-08-07 Thread GitBox
aokolnychyi commented on a change in pull request #351: Provide an API to 
modify records within files
URL: https://github.com/apache/incubator-iceberg/pull/351#discussion_r311501300
 
 

 ##
 File path: core/src/main/java/org/apache/iceberg/DefaultModifyFiles.java
 ##
 @@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Projections;
+
+public class DefaultModifyFiles extends MergingSnapshotProducer 
implements ModifyFiles {
+
+  private Long baseSnapshotId;
+  private Expression rowFilter;
+
+  DefaultModifyFiles(TableOperations ops, Long baseSnapshotId) {
+super(ops);
+this.baseSnapshotId = baseSnapshotId;
+// modify files must fail if any of the deleted paths is missing and 
cannot be deleted
+failMissingDeletePaths();
+  }
+
+  @Override
+  protected ModifyFiles self() {
+return this;
+  }
+
+  @Override
+  protected String operation() {
+return DataOperations.MODIFY;
+  }
+
+  @Override
+  public ModifyFiles modifyFiles(Set filesToDelete, Set 
filesToAdd) {
+Preconditions.checkArgument(filesToDelete != null, "Files to delete cannot 
be null");
+Preconditions.checkArgument(filesToAdd != null, "Files to add cannot be 
null");
+
+for (DataFile toDelete : filesToDelete) {
+  delete(toDelete.path());
+}
+
+for (DataFile toAdd : filesToAdd) {
+  add(toAdd);
+}
+
+return this;
+  }
+
+  @Override
+  public ModifyFiles failOnNewFiles(Expression newRowFilter) {
+Preconditions.checkArgument(newRowFilter != null, "Row filter cannot be 
null");
+this.rowFilter = newRowFilter;
+return this;
+  }
+
+  @Override
+  public List apply(TableMetadata base) {
+if (rowFilter != null) {
+  PartitionSpec spec = writeSpec();
+  Expression inclusiveExpr = 
Projections.inclusive(spec).project(rowFilter);
+  Evaluator inclusive = new Evaluator(spec.partitionType(), inclusiveExpr);
+
+  List newFiles = collectNewFiles(base);
+  for (DataFile newFile : newFiles) {
+// we do partition-level conflict resolution right now
+// we can enhance it by leveraging column stats and MetricsEvaluator
+ValidationException.check(!inclusive.eval(newFile.partition()),
+"Modify operation detected a new file %s that might match %s", 
newFile.path(), rowFilter);
+  }
+}
+
+return super.apply(base);
+  }
+
+  private List collectNewFiles(TableMetadata meta) {
+List newFiles = new ArrayList<>();
+
+Long currentSnapshotId = meta.currentSnapshot() == null ? null : 
meta.currentSnapshot().snapshotId();
+while (currentSnapshotId != null && 
!currentSnapshotId.equals(baseSnapshotId)) {
+  Snapshot currentSnapshot = meta.snapshot(currentSnapshotId);
+
+  if (currentSnapshot == null) {
+throw new ValidationException(
+"Modify operation cannot find snapshot %d. Was it expired?", 
currentSnapshotId);
+  }
+
+  Iterables.addAll(newFiles, currentSnapshot.addedFiles());
 
 Review comment:
   I'll do that in a follow-up PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] aokolnychyi commented on a change in pull request #351: Provide an API to modify records within files

2019-08-05 Thread GitBox
aokolnychyi commented on a change in pull request #351: Provide an API to 
modify records within files
URL: https://github.com/apache/incubator-iceberg/pull/351#discussion_r310806685
 
 

 ##
 File path: core/src/main/java/org/apache/iceberg/DefaultModifyFiles.java
 ##
 @@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Projections;
+
+public class DefaultModifyFiles extends MergingSnapshotProducer 
implements ModifyFiles {
+
+  private Long baseSnapshotId;
+  private Expression rowFilter;
+
+  DefaultModifyFiles(TableOperations ops, Long baseSnapshotId) {
+super(ops);
+this.baseSnapshotId = baseSnapshotId;
+// modify files must fail if any of the deleted paths is missing and 
cannot be deleted
+failMissingDeletePaths();
+  }
+
+  @Override
+  protected ModifyFiles self() {
+return this;
+  }
+
+  @Override
+  protected String operation() {
+return DataOperations.MODIFY;
+  }
+
+  @Override
+  public ModifyFiles modifyFiles(Set filesToDelete, Set 
filesToAdd) {
+Preconditions.checkArgument(filesToDelete != null, "Files to delete cannot 
be null");
+Preconditions.checkArgument(filesToAdd != null, "Files to add cannot be 
null");
+
+for (DataFile toDelete : filesToDelete) {
+  delete(toDelete.path());
+}
+
+for (DataFile toAdd : filesToAdd) {
+  add(toAdd);
+}
+
+return this;
+  }
+
+  @Override
+  public ModifyFiles failOnNewFiles(Expression newRowFilter) {
+Preconditions.checkArgument(newRowFilter != null, "Row filter cannot be 
null");
+this.rowFilter = newRowFilter;
+return this;
+  }
+
+  @Override
+  public List apply(TableMetadata base) {
+if (rowFilter != null) {
+  PartitionSpec spec = writeSpec();
+  Expression inclusiveExpr = 
Projections.inclusive(spec).project(rowFilter);
+  Evaluator inclusive = new Evaluator(spec.partitionType(), inclusiveExpr);
+
+  List newFiles = collectNewFiles(base);
+  for (DataFile newFile : newFiles) {
+// we do partition-level conflict resolution right now
+// we can enhance it by leveraging column stats and MetricsEvaluator
+ValidationException.check(!inclusive.eval(newFile.partition()),
+"Modify operation detected a new file %s that might match %s", 
newFile.path(), rowFilter);
+  }
+}
+
+return super.apply(base);
+  }
+
+  private List collectNewFiles(TableMetadata meta) {
+List newFiles = new ArrayList<>();
+
+Long currentSnapshotId = meta.currentSnapshot() == null ? null : 
meta.currentSnapshot().snapshotId();
+while (currentSnapshotId != null && 
!currentSnapshotId.equals(baseSnapshotId)) {
+  Snapshot currentSnapshot = meta.snapshot(currentSnapshotId);
+
+  if (currentSnapshot == null) {
+throw new ValidationException(
+"Modify operation cannot find snapshot %d. Was it expired?", 
currentSnapshotId);
+  }
+
+  Iterables.addAll(newFiles, currentSnapshot.addedFiles());
 
 Review comment:
   `currentSnapshot.addedFiles()` doesn't return stats, so it will complicate 
stats-based conflict resolution.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] aokolnychyi commented on a change in pull request #351: Provide an API to modify records within files

2019-08-05 Thread GitBox
aokolnychyi commented on a change in pull request #351: Provide an API to 
modify records within files
URL: https://github.com/apache/incubator-iceberg/pull/351#discussion_r310802612
 
 

 ##
 File path: api/src/main/java/org/apache/iceberg/ModifyFiles.java
 ##
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Set;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Expression;
+
+/**
+ * API for modifying files in a table.
+ * 
+ * This API accumulates file additions and deletions, produces a new {@link 
Snapshot} of the
+ * changes, and commits that snapshot as the current.
+ * 
+ * When committing, these changes will be applied to the latest table 
snapshot. Commit conflicts
+ * will be resolved by applying the changes to the new latest snapshot and 
reattempting the commit.
+ * 
+ * Similarly to {@link RewriteFiles}, if any of the deleted files are no 
longer in the latest snapshot
+ * when reattempting, the commit will throw a {@link ValidationException}.
+ */
+public interface ModifyFiles extends SnapshotUpdate {
+  /**
+   * Add a modify that replaces one set of files with another set that might 
contain different data.
+   *
+   * @param filesToDelete files that will be replaced (deleted), cannot be null
+   * @param filesToAddfiles that will be added, cannot be null
+   * @return this for method chaining
+   */
+  ModifyFiles modifyFiles(Set filesToDelete, Set 
filesToAdd);
+
+  /**
+   * Enables validation of new files that are added concurrently after the 
modify operation starts.
+   *
+   * If another concurrent operation commits a new file that might contain 
rows matching
+   * the rowFilter expression, the modify operation will detect this during 
retries and fail.
+   * Setting a correct rowFilter is required to maintain serializable 
isolation.
+   *
+   * @param rowFilter an expression on rows in the table
+   * @return this for method chaining
+   */
+  ModifyFiles failOnNewFiles(Expression rowFilter);
 
 Review comment:
   If we don't set a correct row filter, the isolation level will be snapshot 
isolation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] aokolnychyi commented on a change in pull request #351: Provide an API to modify records within files

2019-08-05 Thread GitBox
aokolnychyi commented on a change in pull request #351: Provide an API to 
modify records within files
URL: https://github.com/apache/incubator-iceberg/pull/351#discussion_r310801433
 
 

 ##
 File path: core/src/main/java/org/apache/iceberg/BaseTransaction.java
 ##
 @@ -158,6 +158,16 @@ public OverwriteFiles newOverwrite() {
 return overwrite;
   }
 
+  @Override
+  public ModifyFiles newModify() {
+checkLastOperationCommitted("ModifyFiles");
+Long baseSnapshotId = currentId(base);
 
 Review comment:
   I am using the base snapshot id of the transaction here. I believe it should 
be safe even if the transaction retries as we still remember the original base 
snapshot id.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] aokolnychyi commented on a change in pull request #351: Provide an API to modify records within files

2019-08-05 Thread GitBox
aokolnychyi commented on a change in pull request #351: Provide an API to 
modify records within files
URL: https://github.com/apache/incubator-iceberg/pull/351#discussion_r310801254
 
 

 ##
 File path: api/src/main/java/org/apache/iceberg/ModifyFiles.java
 ##
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Set;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Expression;
+
+/**
+ * API for modifying files in a table.
+ * 
+ * This API accumulates file additions and deletions, produces a new {@link 
Snapshot} of the
+ * changes, and commits that snapshot as the current.
+ * 
+ * When committing, these changes will be applied to the latest table 
snapshot. Commit conflicts
+ * will be resolved by applying the changes to the new latest snapshot and 
reattempting the commit.
+ * 
+ * Similarly to {@link RewriteFiles}, if any of the deleted files are no 
longer in the latest snapshot
+ * when reattempting, the commit will throw a {@link ValidationException}.
+ */
+public interface ModifyFiles extends SnapshotUpdate {
+  /**
+   * Add a modify that replaces one set of files with another set that might 
contain different data.
+   *
+   * @param filesToDelete files that will be replaced (deleted), cannot be null
+   * @param filesToAddfiles that will be added, cannot be null
+   * @return this for method chaining
+   */
+  ModifyFiles modifyFiles(Set filesToDelete, Set 
filesToAdd);
+
+  /**
+   * Enables validation of new files that are added concurrently after the 
modify operation starts.
+   *
+   * If another concurrent operation commits a new file that might contain 
rows matching
+   * the rowFilter expression, the modify operation will detect this during 
retries and fail.
+   * Setting a correct rowFilter is required to maintain serializable 
isolation.
+   *
+   * @param rowFilter an expression on rows in the table
+   * @return this for method chaining
+   */
+  ModifyFiles failOnNewFiles(Expression rowFilter);
 
 Review comment:
   Any ideas on a better name are welcome


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] aokolnychyi commented on a change in pull request #351: Provide an API to modify records within files

2019-08-05 Thread GitBox
aokolnychyi commented on a change in pull request #351: Provide an API to 
modify records within files
URL: https://github.com/apache/incubator-iceberg/pull/351#discussion_r310801433
 
 

 ##
 File path: core/src/main/java/org/apache/iceberg/BaseTransaction.java
 ##
 @@ -158,6 +158,16 @@ public OverwriteFiles newOverwrite() {
 return overwrite;
   }
 
+  @Override
+  public ModifyFiles newModify() {
+checkLastOperationCommitted("ModifyFiles");
+Long baseSnapshotId = currentId(base);
 
 Review comment:
   I am using the base snapshot id of the transaction here


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org