Re: [PR] feat: Add metadata-only replace API to Table for REPLACE snapshot operations [iceberg-python]

2026-05-16 Thread via GitHub


qzyu999 commented on PR #3131:
URL: https://github.com/apache/iceberg-python/pull/3131#issuecomment-4468114397

   As mentioned in #3319, I would like to delay the merging of this PR #3131 
until the completion of #3320. The reason is that there is a 
`_validate_concurrency()` function that is being developed there, and upon 
completion it would make sense to integrate it `_RewriteFiles` to run all 
conflict validations before trying to commit. It is technically possible to do 
this currently, but upon completion of #3320 it would make sense to refactor 
`_RewriteFiles` to utilize the `_validate_concurrency()` function.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Add metadata-only replace API to Table for REPLACE snapshot operations [iceberg-python]

2026-04-16 Thread via GitHub


qzyu999 commented on PR #3131:
URL: https://github.com/apache/iceberg-python/pull/3131#issuecomment-4263533082

   Hi @geruh, thanks again for the helpful feedback, I've responded to each of 
your review comments and updated the code accordingly, with the exception on 
the note about naming convention for rewrite/replace as that is pending a 
response from @kevinjqliu, PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Add metadata-only replace API to Table for REPLACE snapshot operations [iceberg-python]

2026-04-16 Thread via GitHub


qzyu999 commented on code in PR #3131:
URL: https://github.com/apache/iceberg-python/pull/3131#discussion_r3096467745


##
pyiceberg/table/update/snapshot.py:
##
@@ -667,6 +669,70 @@ def _get_entries(manifest: ManifestFile) -> 
list[ManifestEntry]:
 return []
 
 
+class _RewriteFiles(_SnapshotProducer["_RewriteFiles"]):
+"""A snapshot producer that rewrites data files."""
+
+def __init__(self, operation: Operation, transaction: Transaction, io: 
FileIO, snapshot_properties: dict[str, str]):
+super().__init__(operation, transaction, io, 
snapshot_properties=snapshot_properties)
+
+def _commit(self) -> UpdatesAndRequirements:
+# Only produce a commit when there is something to rewrite
+if self._deleted_data_files or self._added_data_files:
+# Grab the entries that we actually found in the table's manifests
+deleted_entries = self._deleted_entries()
+found_deleted_files = {entry.data_file for entry in 
deleted_entries}
+
+# If the user asked to delete files that aren't in the table, 
abort.
+if len(found_deleted_files) != len(self._deleted_data_files):
+raise ValueError("Cannot delete files that are not present in 
the table")
+
+added_records = sum(f.record_count for f in self._added_data_files)
+deleted_records = sum(entry.data_file.record_count for entry in 
deleted_entries)
+
+if added_records > deleted_records:
+raise ValueError(f"Invalid replace: records added 
({added_records}) exceeds records removed ({deleted_records})")
+
+return super()._commit()
+else:
+return (), ()
+
+@cached_property
+def _cached_deleted_entries(self) -> list[ManifestEntry]:
+"""Check if we need to mark the files as deleted."""
+if self._parent_snapshot_id is not None:
+previous_snapshot = 
self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id)
+if previous_snapshot is None:
+raise ValueError(f"Could not find the previous snapshot: 
{self._parent_snapshot_id}")
+
+executor = ExecutorFactory.get_or_create()
+
+def _get_entries(manifest: ManifestFile) -> list[ManifestEntry]:
+return [
+ManifestEntry.from_args(
+status=ManifestEntryStatus.DELETED,
+snapshot_id=self.snapshot_id,
+sequence_number=entry.sequence_number,
+file_sequence_number=entry.file_sequence_number,
+data_file=entry.data_file,
+)
+for entry in manifest.fetch_manifest_entry(self._io, 
discard_deleted=True)
+if entry.data_file.content == DataFileContent.DATA and 
entry.data_file in self._deleted_data_files
+]
+
+list_of_entries = executor.map(_get_entries, 
previous_snapshot.manifests(self._io))
+return list(itertools.chain(*list_of_entries))
+else:
+return []
+
+def _deleted_entries(self) -> list[ManifestEntry]:
+"""Check if we need to mark the files as deleted."""
+return self._cached_deleted_entries
+
+def _existing_manifests(self) -> list[ManifestFile]:
+"""To determine if there are any existing manifests."""
+return self._get_existing_manifests()

Review Comment:
   Hi @geruh, thanks for pointing this out, these doc strings in 
`_RewriteFiles` have been removed in c60d5ad62511b5db8d8f7e1eca98af50e2aee3f4.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Add metadata-only replace API to Table for REPLACE snapshot operations [iceberg-python]

2026-04-16 Thread via GitHub


qzyu999 commented on code in PR #3131:
URL: https://github.com/apache/iceberg-python/pull/3131#discussion_r3096424562


##
pyiceberg/table/update/snapshot.py:
##
@@ -165,6 +165,50 @@ def _calculate_added_rows(self, manifests: 
list[ManifestFile]) -> int:
 added_rows += manifest.added_rows_count
 return added_rows
 
+def _get_existing_manifests(self) -> list[ManifestFile]:
+"""Filter existing manifests and rewrite those containing deleted data 
files."""
+existing_files: list[ManifestFile] = []
+# Use manifest pruning if a predicate is set (primarily for Overwrite)
+manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = 
KeyDefaultDict(self._build_manifest_evaluator)
+
+if snapshot := 
self._transaction.table_metadata.snapshot_by_name(name=self._target_branch):
+for manifest_file in snapshot.manifests(io=self._io):
+# Skip pruning for rewrite operations unless we want to 
optimize later
+if self._operation == Operation.OVERWRITE and not 
manifest_evaluators[manifest_file.partition_spec_id](

Review Comment:
   Hi @geruh, great catch, I agree that the base class shouldn't be coupled to 
specific child operations. I've refactored `_get_existing_manifests` to accept 
a `should_use_manifest_pruning` flag so the child classes can explicitly 
opt-in. The updates are in 8f1f9b949ebcc83e99ccc80e0b3d9fda050a096d.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Add metadata-only replace API to Table for REPLACE snapshot operations [iceberg-python]

2026-04-16 Thread via GitHub


qzyu999 commented on code in PR #3131:
URL: https://github.com/apache/iceberg-python/pull/3131#discussion_r3096281892


##
tests/table/test_replace.py:
##
@@ -0,0 +1,637 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import cast
+
+import pytest
+
+from pyiceberg.catalog import Catalog
+from pyiceberg.manifest import (
+DataFile,
+DataFileContent,
+FileFormat,
+ManifestEntry,
+ManifestEntryStatus,
+)
+from pyiceberg.schema import Schema
+from pyiceberg.table.snapshots import Operation, Snapshot, Summary
+from pyiceberg.typedef import Record
+
+
+def test_replace_internally(catalog: Catalog) -> None:
+# Setup a basic table using the catalog fixture
+catalog.create_namespace("default")
+table = catalog.create_table(
+identifier="default.test_replace",
+schema=Schema(),
+)
+
+# 1. File we will delete
+file_to_delete = DataFile.from_args(

Review Comment:
   Hi @geruh, great idea, this has been implemented in 
9681ec3369e7b605d98d3a87027a0c434ef0ccc2.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Add metadata-only replace API to Table for REPLACE snapshot operations [iceberg-python]

2026-04-16 Thread via GitHub


qzyu999 commented on code in PR #3131:
URL: https://github.com/apache/iceberg-python/pull/3131#discussion_r3096190054


##
pyiceberg/table/update/snapshot.py:
##
@@ -667,6 +669,70 @@ def _get_entries(manifest: ManifestFile) -> 
list[ManifestEntry]:
 return []
 
 
+class _RewriteFiles(_SnapshotProducer["_RewriteFiles"]):
+"""A snapshot producer that rewrites data files."""
+
+def __init__(self, operation: Operation, transaction: Transaction, io: 
FileIO, snapshot_properties: dict[str, str]):
+super().__init__(operation, transaction, io, 
snapshot_properties=snapshot_properties)

Review Comment:
   Hi @geruh, the changes have been made in 
c3570d82c8cc974e5928fb9d1efa1e1a4bc0cdb1 as part of adding `branch` as an arg 
for `_RewriteFiles`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Add metadata-only replace API to Table for REPLACE snapshot operations [iceberg-python]

2026-04-16 Thread via GitHub


qzyu999 commented on code in PR #3131:
URL: https://github.com/apache/iceberg-python/pull/3131#discussion_r3096185437


##
pyiceberg/table/update/snapshot.py:
##
@@ -724,6 +790,14 @@ def delete(self) -> _DeleteFiles:
 snapshot_properties=self._snapshot_properties,
 )
 
+def replace(self) -> _RewriteFiles:
+return _RewriteFiles(
+operation=Operation.REPLACE,
+transaction=self._transaction,

Review Comment:
   Hi @geruh, thanks for noticing, this was an oversight. The changes have been 
made in c3570d82c8cc974e5928fb9d1efa1e1a4bc0cdb1 and a test has been added for 
this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Add metadata-only replace API to Table for REPLACE snapshot operations [iceberg-python]

2026-04-16 Thread via GitHub


geruh commented on code in PR #3131:
URL: https://github.com/apache/iceberg-python/pull/3131#discussion_r3095873141


##
pyiceberg/table/update/snapshot.py:
##
@@ -667,6 +667,96 @@ def _get_entries(manifest: ManifestFile) -> 
list[ManifestEntry]:
 return []
 
 
+class _RewriteFiles(_SnapshotProducer["_RewriteFiles"]):
+"""A snapshot producer that rewrites data files."""
+
+def __init__(self, operation: Operation, transaction: Transaction, io: 
FileIO, snapshot_properties: dict[str, str]):
+super().__init__(operation, transaction, io, 
snapshot_properties=snapshot_properties)
+
+def _commit(self) -> UpdatesAndRequirements:
+# Only produce a commit when there is something to rewrite
+if self._deleted_data_files or self._added_data_files:
+# Grab the entries that we actually found in the table's manifests
+deleted_entries = self._deleted_entries()
+found_deleted_files = {entry.data_file for entry in 
deleted_entries}
+
+# If the user asked to delete files that aren't in the table, 
abort.
+if len(found_deleted_files) != len(self._deleted_data_files):
+raise ValueError("Cannot delete files that are not present in 
the table")
+
+added_records = sum(f.record_count for f in self._added_data_files)
+deleted_records = sum(entry.data_file.record_count for entry in 
deleted_entries)
+
+if added_records > deleted_records:

Review Comment:
   Okay let's keep the check I took a deeper look into the snapshot producer on 
the java side so let's align closer to that: 
   
   
https://github.com/apache/iceberg/blob/dde712ec9ed6c9d28183ee4615d50f97b246af5d/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L322-L334



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Add metadata-only replace API to Table for REPLACE snapshot operations [iceberg-python]

2026-04-16 Thread via GitHub


geruh commented on code in PR #3131:
URL: https://github.com/apache/iceberg-python/pull/3131#discussion_r3095389329


##
pyiceberg/table/update/snapshot.py:
##
@@ -724,6 +790,14 @@ def delete(self) -> _DeleteFiles:
 snapshot_properties=self._snapshot_properties,
 )
 
+def replace(self) -> _RewriteFiles:
+return _RewriteFiles(
+operation=Operation.REPLACE,
+transaction=self._transaction,

Review Comment:
   I noticed that branch is missing here is there a reason for that?



##
pyiceberg/table/update/snapshot.py:
##
@@ -724,6 +814,14 @@ def delete(self) -> _DeleteFiles:
 snapshot_properties=self._snapshot_properties,
 )
 
+def replace(self) -> _RewriteFiles:
+return _RewriteFiles(

Review Comment:
   Yeah there is a bit of a distinction here, since rewrite is basically the 
rewrite of data files and replace is the logical change to your snapshot 
metadata. My thinking is that the users in java today are used to interacting 
with this api through:
   
   ```java
   table.newRewrite()
   .deleteFile(old)
   .addFile(new)
   .commit();
   ```
   So someone coming from Java Iceberg will look for rewrite, not replace. But 
ultimately maybe there is more of a history as to why the it follows this 
naming convention im missing on. 
   
   WDYT @kevinjqliu?
   



##
pyiceberg/table/update/snapshot.py:
##
@@ -667,6 +669,70 @@ def _get_entries(manifest: ManifestFile) -> 
list[ManifestEntry]:
 return []
 
 
+class _RewriteFiles(_SnapshotProducer["_RewriteFiles"]):
+"""A snapshot producer that rewrites data files."""
+
+def __init__(self, operation: Operation, transaction: Transaction, io: 
FileIO, snapshot_properties: dict[str, str]):
+super().__init__(operation, transaction, io, 
snapshot_properties=snapshot_properties)
+
+def _commit(self) -> UpdatesAndRequirements:
+# Only produce a commit when there is something to rewrite
+if self._deleted_data_files or self._added_data_files:
+# Grab the entries that we actually found in the table's manifests
+deleted_entries = self._deleted_entries()
+found_deleted_files = {entry.data_file for entry in 
deleted_entries}
+
+# If the user asked to delete files that aren't in the table, 
abort.
+if len(found_deleted_files) != len(self._deleted_data_files):
+raise ValueError("Cannot delete files that are not present in 
the table")
+
+added_records = sum(f.record_count for f in self._added_data_files)
+deleted_records = sum(entry.data_file.record_count for entry in 
deleted_entries)
+
+if added_records > deleted_records:
+raise ValueError(f"Invalid replace: records added 
({added_records}) exceeds records removed ({deleted_records})")
+
+return super()._commit()
+else:
+return (), ()
+
+@cached_property
+def _cached_deleted_entries(self) -> list[ManifestEntry]:
+"""Check if we need to mark the files as deleted."""
+if self._parent_snapshot_id is not None:
+previous_snapshot = 
self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id)
+if previous_snapshot is None:
+raise ValueError(f"Could not find the previous snapshot: 
{self._parent_snapshot_id}")
+
+executor = ExecutorFactory.get_or_create()
+
+def _get_entries(manifest: ManifestFile) -> list[ManifestEntry]:
+return [
+ManifestEntry.from_args(
+status=ManifestEntryStatus.DELETED,
+snapshot_id=self.snapshot_id,
+sequence_number=entry.sequence_number,
+file_sequence_number=entry.file_sequence_number,
+data_file=entry.data_file,
+)
+for entry in manifest.fetch_manifest_entry(self._io, 
discard_deleted=True)
+if entry.data_file.content == DataFileContent.DATA and 
entry.data_file in self._deleted_data_files
+]
+
+list_of_entries = executor.map(_get_entries, 
previous_snapshot.manifests(self._io))
+return list(itertools.chain(*list_of_entries))
+else:
+return []
+
+def _deleted_entries(self) -> list[ManifestEntry]:
+"""Check if we need to mark the files as deleted."""
+return self._cached_deleted_entries
+
+def _existing_manifests(self) -> list[ManifestFile]:
+"""To determine if there are any existing manifests."""
+return self._get_existing_manifests()

Review Comment:
   nit: Looks like these doc strings were copy pasta'd over from the other 
classes, and don't fit how they are used here. Either we can remove them or 
change to fit their usage.



##
pyiceber

Re: [PR] feat: Add metadata-only replace API to Table for REPLACE snapshot operations [iceberg-python]

2026-04-15 Thread via GitHub


qzyu999 commented on PR #3131:
URL: https://github.com/apache/iceberg-python/pull/3131#issuecomment-4257371529

   Hi @geruh, thanks for the awesome feedback, I've responded to each of your 
replies.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Add metadata-only replace API to Table for REPLACE snapshot operations [iceberg-python]

2026-04-15 Thread via GitHub


qzyu999 commented on code in PR #3131:
URL: https://github.com/apache/iceberg-python/pull/3131#discussion_r3090758831


##
pyiceberg/table/update/snapshot.py:
##
@@ -667,6 +667,96 @@ def _get_entries(manifest: ManifestFile) -> 
list[ManifestEntry]:
 return []
 
 
+class _RewriteFiles(_SnapshotProducer["_RewriteFiles"]):
+"""A snapshot producer that rewrites data files."""
+
+def __init__(self, operation: Operation, transaction: Transaction, io: 
FileIO, snapshot_properties: dict[str, str]):
+super().__init__(operation, transaction, io, 
snapshot_properties=snapshot_properties)
+
+def _commit(self) -> UpdatesAndRequirements:
+# Only produce a commit when there is something to rewrite
+if self._deleted_data_files or self._added_data_files:
+# Grab the entries that we actually found in the table's manifests
+deleted_entries = self._deleted_entries()
+found_deleted_files = {entry.data_file for entry in 
deleted_entries}
+
+# If the user asked to delete files that aren't in the table, 
abort.
+if len(found_deleted_files) != len(self._deleted_data_files):
+raise ValueError("Cannot delete files that are not present in 
the table")
+
+added_records = sum(f.record_count for f in self._added_data_files)
+deleted_records = sum(entry.data_file.record_count for entry in 
deleted_entries)
+
+if added_records > deleted_records:

Review Comment:
   Hi @geruh, thanks for flagging this, you're right that this is a safety 
guard, but it doesn't yet factor in future changes when adding delete file 
rewriting. Should we add something like this?
   
   ```python
   # Note: This physical record count invariant is a sanity guard for data file 
   # compaction to ensure no data is accidentally duplicated or invented. 
   # TODO: This will need to be evolved into a logical record count validation 
   # once PyIceberg supports rewriting delete files (Merge-on-Read).
   added_records = sum(f.record_count for f in self._added_data_files)
   deleted_records = sum(entry.data_file.record_count for entry in 
deleted_entries)
   
   if added_records > deleted_records:
   raise ValueError(f"Invalid replace: records added ({added_records}) 
exceeds records removed ({deleted_records})")
   ```
   This logical record count validation would involve something like having the 
`_commit` method to do the following, which the codebase currently cannot do:
   - Identify associated Delete Files: For every `DataFile` you are deleting, 
you would need to find every Position Delete or Equality Delete file that 
points to it.
   - Calculate the "Subtraction": You would need to subtract those delete row 
counts from the physical record_count of the old files to find the Old Logical 
Count.
   - Compare: You would then verify that Old Logical Count == New Logical Count.
   The current `_RewriteFiles` implementation is "blind" to deletes. It only 
tracks `_added_data_files` and `_deleted_data_files`.
   I believe this can be part of a full MoR implementation, something that I 
would love to work on after finishing these maintenance tasks.
   
   Otherwise, I can also remove it from `_RerwriteFiles` and move forward, WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Add metadata-only replace API to Table for REPLACE snapshot operations [iceberg-python]

2026-04-15 Thread via GitHub


qzyu999 commented on code in PR #3131:
URL: https://github.com/apache/iceberg-python/pull/3131#discussion_r3090686330


##
tests/table/test_replace.py:
##
@@ -0,0 +1,458 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import cast
+
+import pytest
+
+from pyiceberg.catalog import Catalog
+from pyiceberg.manifest import (
+DataFile,
+DataFileContent,
+FileFormat,
+ManifestEntry,
+ManifestEntryStatus,
+)
+from pyiceberg.schema import Schema
+from pyiceberg.table.snapshots import Operation, Snapshot, Summary
+from pyiceberg.typedef import Record
+
+
+def test_replace_internally(catalog: Catalog) -> None:

Review Comment:
   Hi @geruh, these are all great test ideas, I've added them in d939b67.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Add metadata-only replace API to Table for REPLACE snapshot operations [iceberg-python]

2026-04-15 Thread via GitHub


qzyu999 commented on code in PR #3131:
URL: https://github.com/apache/iceberg-python/pull/3131#discussion_r3090684619


##
tests/table/test_replace.py:
##
@@ -0,0 +1,458 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import cast
+
+import pytest
+
+from pyiceberg.catalog import Catalog
+from pyiceberg.manifest import (
+DataFile,
+DataFileContent,
+FileFormat,
+ManifestEntry,
+ManifestEntryStatus,
+)
+from pyiceberg.schema import Schema
+from pyiceberg.table.snapshots import Operation, Snapshot, Summary
+from pyiceberg.typedef import Record
+
+
+def test_replace_internally(catalog: Catalog) -> None:
+# Setup a basic table using the catalog fixture
+catalog.create_namespace("default")
+table = catalog.create_table(
+identifier="default.test_replace",
+schema=Schema(),
+)
+
+# 1. File we will delete
+file_to_delete = DataFile.from_args(
+file_path="s3://bucket/test/data/deleted.parquet",
+file_format=FileFormat.PARQUET,
+partition=Record(),
+record_count=100,
+file_size_in_bytes=1024,
+content=DataFileContent.DATA,
+)
+file_to_delete.spec_id = 0
+
+# 2. File we will leave completely untouched
+file_to_keep = DataFile.from_args(
+file_path="s3://bucket/test/data/kept.parquet",
+file_format=FileFormat.PARQUET,
+partition=Record(),
+record_count=50,
+file_size_in_bytes=512,
+content=DataFileContent.DATA,
+)
+file_to_keep.spec_id = 0
+
+# 3. File we are adding as a replacement
+file_to_add = DataFile.from_args(
+file_path="s3://bucket/test/data/added.parquet",
+file_format=FileFormat.PARQUET,
+partition=Record(),
+record_count=100,
+file_size_in_bytes=1024,
+content=DataFileContent.DATA,
+)
+file_to_add.spec_id = 0
+
+# Initially append BOTH the file to delete and the file to keep
+with table.transaction() as tx:
+with tx.update_snapshot().fast_append() as append_snapshot:
+append_snapshot.append_data_file(file_to_delete)
+append_snapshot.append_data_file(file_to_keep)
+
+old_snapshot = cast(Snapshot, table.current_snapshot())
+old_snapshot_id = old_snapshot.snapshot_id
+old_sequence_number = cast(int, old_snapshot.sequence_number)
+
+# Call the internal replace API
+with table.transaction() as tx:
+with tx.update_snapshot().replace() as rewrite:
+rewrite.delete_data_file(file_to_delete)
+rewrite.append_data_file(file_to_add)
+
+snapshot = cast(Snapshot, table.current_snapshot())
+summary = cast(Summary, snapshot.summary)
+
+# 1. Has a unique snapshot ID
+assert snapshot.snapshot_id is not None
+assert snapshot.snapshot_id != old_snapshot_id
+
+# 2. Parent points to the previous snapshot
+assert snapshot.parent_snapshot_id == old_snapshot_id
+
+# 3. Sequence number is exactly previous + 1
+assert snapshot.sequence_number == old_sequence_number + 1
+
+# 4. Operation type is set to "replace"
+assert summary["operation"] == Operation.REPLACE
+
+# 5. Manifest list path is correct (just verify it exists and is a string 
path)
+assert snapshot.manifest_list is not None
+assert isinstance(snapshot.manifest_list, str)
+
+# 6. Summary counts are accurate
+assert summary["added-data-files"] == "1"
+assert summary["deleted-data-files"] == "1"
+assert summary["added-records"] == "100"
+assert summary["deleted-records"] == "100"
+assert summary["total-records"] == "150"
+
+# Fetch all entries from the new manifests
+manifest_files = snapshot.manifests(table.io)
+entries: list[ManifestEntry] = []
+for manifest in manifest_files:
+entries.extend(manifest.fetch_manifest_entry(table.io, 
discard_deleted=False))
+
+# We expect 3 entries: ADDED, DELETED, and EXISTING
+assert len(entries) == 3
+
+# Check ADDED
+added_entries = [e for e in entries if e.status == 
ManifestEntryStatus.ADDED]
+assert len(added_entries) == 1
+assert added_entries[0].data_file.file_path == file_to_add.file_path
+assert add

Re: [PR] feat: Add metadata-only replace API to Table for REPLACE snapshot operations [iceberg-python]

2026-04-15 Thread via GitHub


qzyu999 commented on code in PR #3131:
URL: https://github.com/apache/iceberg-python/pull/3131#discussion_r3090684619


##
tests/table/test_replace.py:
##
@@ -0,0 +1,458 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import cast
+
+import pytest
+
+from pyiceberg.catalog import Catalog
+from pyiceberg.manifest import (
+DataFile,
+DataFileContent,
+FileFormat,
+ManifestEntry,
+ManifestEntryStatus,
+)
+from pyiceberg.schema import Schema
+from pyiceberg.table.snapshots import Operation, Snapshot, Summary
+from pyiceberg.typedef import Record
+
+
+def test_replace_internally(catalog: Catalog) -> None:
+# Setup a basic table using the catalog fixture
+catalog.create_namespace("default")
+table = catalog.create_table(
+identifier="default.test_replace",
+schema=Schema(),
+)
+
+# 1. File we will delete
+file_to_delete = DataFile.from_args(
+file_path="s3://bucket/test/data/deleted.parquet",
+file_format=FileFormat.PARQUET,
+partition=Record(),
+record_count=100,
+file_size_in_bytes=1024,
+content=DataFileContent.DATA,
+)
+file_to_delete.spec_id = 0
+
+# 2. File we will leave completely untouched
+file_to_keep = DataFile.from_args(
+file_path="s3://bucket/test/data/kept.parquet",
+file_format=FileFormat.PARQUET,
+partition=Record(),
+record_count=50,
+file_size_in_bytes=512,
+content=DataFileContent.DATA,
+)
+file_to_keep.spec_id = 0
+
+# 3. File we are adding as a replacement
+file_to_add = DataFile.from_args(
+file_path="s3://bucket/test/data/added.parquet",
+file_format=FileFormat.PARQUET,
+partition=Record(),
+record_count=100,
+file_size_in_bytes=1024,
+content=DataFileContent.DATA,
+)
+file_to_add.spec_id = 0
+
+# Initially append BOTH the file to delete and the file to keep
+with table.transaction() as tx:
+with tx.update_snapshot().fast_append() as append_snapshot:
+append_snapshot.append_data_file(file_to_delete)
+append_snapshot.append_data_file(file_to_keep)
+
+old_snapshot = cast(Snapshot, table.current_snapshot())
+old_snapshot_id = old_snapshot.snapshot_id
+old_sequence_number = cast(int, old_snapshot.sequence_number)
+
+# Call the internal replace API
+with table.transaction() as tx:
+with tx.update_snapshot().replace() as rewrite:
+rewrite.delete_data_file(file_to_delete)
+rewrite.append_data_file(file_to_add)
+
+snapshot = cast(Snapshot, table.current_snapshot())
+summary = cast(Summary, snapshot.summary)
+
+# 1. Has a unique snapshot ID
+assert snapshot.snapshot_id is not None
+assert snapshot.snapshot_id != old_snapshot_id
+
+# 2. Parent points to the previous snapshot
+assert snapshot.parent_snapshot_id == old_snapshot_id
+
+# 3. Sequence number is exactly previous + 1
+assert snapshot.sequence_number == old_sequence_number + 1
+
+# 4. Operation type is set to "replace"
+assert summary["operation"] == Operation.REPLACE
+
+# 5. Manifest list path is correct (just verify it exists and is a string 
path)
+assert snapshot.manifest_list is not None
+assert isinstance(snapshot.manifest_list, str)
+
+# 6. Summary counts are accurate
+assert summary["added-data-files"] == "1"
+assert summary["deleted-data-files"] == "1"
+assert summary["added-records"] == "100"
+assert summary["deleted-records"] == "100"
+assert summary["total-records"] == "150"
+
+# Fetch all entries from the new manifests
+manifest_files = snapshot.manifests(table.io)
+entries: list[ManifestEntry] = []
+for manifest in manifest_files:
+entries.extend(manifest.fetch_manifest_entry(table.io, 
discard_deleted=False))
+
+# We expect 3 entries: ADDED, DELETED, and EXISTING
+assert len(entries) == 3
+
+# Check ADDED
+added_entries = [e for e in entries if e.status == 
ManifestEntryStatus.ADDED]
+assert len(added_entries) == 1
+assert added_entries[0].data_file.file_path == file_to_add.file_path
+assert add

Re: [PR] feat: Add metadata-only replace API to Table for REPLACE snapshot operations [iceberg-python]

2026-04-15 Thread via GitHub


qzyu999 commented on code in PR #3131:
URL: https://github.com/apache/iceberg-python/pull/3131#discussion_r3090633495


##
pyiceberg/table/update/snapshot.py:
##
@@ -724,6 +814,14 @@ def delete(self) -> _DeleteFiles:
 snapshot_properties=self._snapshot_properties,
 )
 
+def replace(self) -> _RewriteFiles:
+return _RewriteFiles(

Review Comment:
   Hi @geruh, you bring up a good point, and it's something I noticed seemed 
off along the way. The reason why we have this discrepancy is because we're 
mirroring what's found in the Java code itself.
   
   - Looking here 
(https://iceberg.apache.org/javadoc/latest/org/apache/iceberg/RewriteFiles.html),
 you can see that the "API for replacing files in a table." is called 
`RewriteFiles`.
   - Looking here 
(https://iceberg.apache.org/javadoc/1.4.2/org/apache/iceberg/DataOperations.html#REPLACE),
 you can see that the `REPLACE` operation is implemented by `RewriteFiles`.
   - On a slightly unrelated note here 
(https://iceberg.apache.org/javadoc/1.4.2/org/apache/iceberg/DataOperations.html#OVERWRITE),
 the `OVERWRITE` operation itself can be implemented by `ReplacePartitions`.
   
   I named the Python API `replace()` to accurately reflect the 
`Operation.REPLACE` snapshot string it generates, while keeping the internal 
class named `_RewriteFiles` to match the Java builder logic.
   
   That said, if you feel strongly about matching the Java API's user-facing 
method (`rewrite()`) rather than the snapshot operation (`replace()`), I'm 
happy to rename the public method to `rewrite()` for consistency. Let me know 
what you prefer!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Add metadata-only replace API to Table for REPLACE snapshot operations [iceberg-python]

2026-04-15 Thread via GitHub


qzyu999 commented on code in PR #3131:
URL: https://github.com/apache/iceberg-python/pull/3131#discussion_r3090123085


##
pyiceberg/table/update/snapshot.py:
##
@@ -667,6 +667,96 @@ def _get_entries(manifest: ManifestFile) -> 
list[ManifestEntry]:
 return []
 
 
+class _RewriteFiles(_SnapshotProducer["_RewriteFiles"]):
+"""A snapshot producer that rewrites data files."""
+
+def __init__(self, operation: Operation, transaction: Transaction, io: 
FileIO, snapshot_properties: dict[str, str]):
+super().__init__(operation, transaction, io, 
snapshot_properties=snapshot_properties)
+
+def _commit(self) -> UpdatesAndRequirements:
+# Only produce a commit when there is something to rewrite
+if self._deleted_data_files or self._added_data_files:

Review Comment:
   Hi @geruh, great suggestion, I've applied the changes in c8162a8.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Add metadata-only replace API to Table for REPLACE snapshot operations [iceberg-python]

2026-04-15 Thread via GitHub


qzyu999 commented on code in PR #3131:
URL: https://github.com/apache/iceberg-python/pull/3131#discussion_r3089966996


##
pyiceberg/table/update/snapshot.py:
##
@@ -667,6 +667,96 @@ def _get_entries(manifest: ManifestFile) -> 
list[ManifestEntry]:
 return []
 
 
+class _RewriteFiles(_SnapshotProducer["_RewriteFiles"]):
+"""A snapshot producer that rewrites data files."""
+
+def __init__(self, operation: Operation, transaction: Transaction, io: 
FileIO, snapshot_properties: dict[str, str]):
+super().__init__(operation, transaction, io, 
snapshot_properties=snapshot_properties)
+
+def _commit(self) -> UpdatesAndRequirements:
+# Only produce a commit when there is something to rewrite
+if self._deleted_data_files or self._added_data_files:
+# Grab the entries that we actually found in the table's manifests
+deleted_entries = self._deleted_entries()
+found_deleted_files = {entry.data_file for entry in 
deleted_entries}
+
+# If the user asked to delete files that aren't in the table, 
abort.
+if len(found_deleted_files) != len(self._deleted_data_files):
+raise ValueError("Cannot delete files that are not present in 
the table")
+
+added_records = sum(f.record_count for f in self._added_data_files)
+deleted_records = sum(entry.data_file.record_count for entry in 
deleted_entries)
+
+if added_records > deleted_records:
+raise ValueError(f"Invalid replace: records added 
({added_records}) exceeds records removed ({deleted_records})")
+
+return super()._commit()
+else:
+return (), ()
+
+def _deleted_entries(self) -> list[ManifestEntry]:
+"""Check if we need to mark the files as deleted."""
+if self._parent_snapshot_id is not None:
+previous_snapshot = 
self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id)
+if previous_snapshot is None:
+raise ValueError(f"Could not find the previous snapshot: 
{self._parent_snapshot_id}")
+
+executor = ExecutorFactory.get_or_create()
+
+def _get_entries(manifest: ManifestFile) -> list[ManifestEntry]:
+return [
+ManifestEntry.from_args(
+status=ManifestEntryStatus.DELETED,
+snapshot_id=self.snapshot_id,
+sequence_number=entry.sequence_number,
+file_sequence_number=entry.file_sequence_number,
+data_file=entry.data_file,
+)
+for entry in manifest.fetch_manifest_entry(self._io, 
discard_deleted=True)
+if entry.data_file.content == DataFileContent.DATA and 
entry.data_file in self._deleted_data_files
+]
+
+list_of_entries = executor.map(_get_entries, 
previous_snapshot.manifests(self._io))
+return list(itertools.chain(*list_of_entries))
+else:
+return []
+
+def _existing_manifests(self) -> list[ManifestFile]:
+"""To determine if there are any existing manifests."""

Review Comment:
   Hi @geruh, thank you for the feedback! I agree with this idea that the two 
sets of code are highly similar, in b0a770c I create a 
`_get_existing_manifests` function in `_SnapshotProducer` that is reused in 
both `_OverwriteFiles` and `_RewriteFiles`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Add metadata-only replace API to Table for REPLACE snapshot operations [iceberg-python]

2026-04-15 Thread via GitHub


geruh commented on code in PR #3131:
URL: https://github.com/apache/iceberg-python/pull/3131#discussion_r3089060985


##
pyiceberg/table/update/snapshot.py:
##
@@ -724,6 +814,14 @@ def delete(self) -> _DeleteFiles:
 snapshot_properties=self._snapshot_properties,
 )
 
+def replace(self) -> _RewriteFiles:
+return _RewriteFiles(

Review Comment:
   I'm sort of confused by the naming since we are introducing a user facing 
API `replace` but the underlying snapshot operation is a `rewrite`? We should 
rename to `rewrite()` for consistency? Unless I'm missing something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Add metadata-only replace API to Table for REPLACE snapshot operations [iceberg-python]

2026-04-15 Thread via GitHub


geruh commented on code in PR #3131:
URL: https://github.com/apache/iceberg-python/pull/3131#discussion_r3088999476


##
pyiceberg/table/update/snapshot.py:
##
@@ -667,6 +667,96 @@ def _get_entries(manifest: ManifestFile) -> 
list[ManifestEntry]:
 return []
 
 
+class _RewriteFiles(_SnapshotProducer["_RewriteFiles"]):
+"""A snapshot producer that rewrites data files."""
+
+def __init__(self, operation: Operation, transaction: Transaction, io: 
FileIO, snapshot_properties: dict[str, str]):
+super().__init__(operation, transaction, io, 
snapshot_properties=snapshot_properties)
+
+def _commit(self) -> UpdatesAndRequirements:
+# Only produce a commit when there is something to rewrite
+if self._deleted_data_files or self._added_data_files:
+# Grab the entries that we actually found in the table's manifests
+deleted_entries = self._deleted_entries()
+found_deleted_files = {entry.data_file for entry in 
deleted_entries}
+
+# If the user asked to delete files that aren't in the table, 
abort.
+if len(found_deleted_files) != len(self._deleted_data_files):
+raise ValueError("Cannot delete files that are not present in 
the table")
+
+added_records = sum(f.record_count for f in self._added_data_files)
+deleted_records = sum(entry.data_file.record_count for entry in 
deleted_entries)
+
+if added_records > deleted_records:
+raise ValueError(f"Invalid replace: records added 
({added_records}) exceeds records removed ({deleted_records})")
+
+return super()._commit()
+else:
+return (), ()
+
+def _deleted_entries(self) -> list[ManifestEntry]:
+"""Check if we need to mark the files as deleted."""
+if self._parent_snapshot_id is not None:
+previous_snapshot = 
self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id)
+if previous_snapshot is None:
+raise ValueError(f"Could not find the previous snapshot: 
{self._parent_snapshot_id}")
+
+executor = ExecutorFactory.get_or_create()
+
+def _get_entries(manifest: ManifestFile) -> list[ManifestEntry]:
+return [
+ManifestEntry.from_args(
+status=ManifestEntryStatus.DELETED,
+snapshot_id=self.snapshot_id,
+sequence_number=entry.sequence_number,
+file_sequence_number=entry.file_sequence_number,
+data_file=entry.data_file,
+)
+for entry in manifest.fetch_manifest_entry(self._io, 
discard_deleted=True)
+if entry.data_file.content == DataFileContent.DATA and 
entry.data_file in self._deleted_data_files
+]
+
+list_of_entries = executor.map(_get_entries, 
previous_snapshot.manifests(self._io))
+return list(itertools.chain(*list_of_entries))
+else:
+return []
+
+def _existing_manifests(self) -> list[ManifestFile]:
+"""To determine if there are any existing manifests."""

Review Comment:
   This logic looks nearly identical to the 
`OverwriteFiles._existing_manifests()`, and probably can use some clean python 
oop with the different snapshot classes. So let's create a helper in the 
`_SnapshotProducer` class. Then Overwrite can add it's additional logic on top 
and rewrite can just use it.



##
pyiceberg/table/update/snapshot.py:
##
@@ -667,6 +667,96 @@ def _get_entries(manifest: ManifestFile) -> 
list[ManifestEntry]:
 return []
 
 
+class _RewriteFiles(_SnapshotProducer["_RewriteFiles"]):
+"""A snapshot producer that rewrites data files."""
+
+def __init__(self, operation: Operation, transaction: Transaction, io: 
FileIO, snapshot_properties: dict[str, str]):
+super().__init__(operation, transaction, io, 
snapshot_properties=snapshot_properties)
+
+def _commit(self) -> UpdatesAndRequirements:
+# Only produce a commit when there is something to rewrite
+if self._deleted_data_files or self._added_data_files:

Review Comment:
   I think we can replicate the `_DeleteFiles` logic here by using the 
`@cache_property` on _the compute deletes function. Especially since 
`_commit()` calls `self._deleted_entries()` for validation and then calls the 
super commit to write and get delete entries. 
   
   
https://github.com/apache/iceberg-python/blob/721c5aa504e2c2069a197f01a238070e37e389d8/pyiceberg/table/update/snapshot.py#L407



##
pyiceberg/table/update/snapshot.py:
##
@@ -667,6 +667,96 @@ def _get_entries(manifest: ManifestFile) -> 
list[ManifestEntry]:
 return []
 
 
+class _RewriteFiles(_SnapshotProducer["_RewriteFiles"]):
+"""A snapshot producer that rewrites data files."""
+
+def __init__(

Re: [PR] feat: Add metadata-only replace API to Table for REPLACE snapshot operations [iceberg-python]

2026-03-27 Thread via GitHub


qzyu999 commented on code in PR #3131:
URL: https://github.com/apache/iceberg-python/pull/3131#discussion_r3004042734


##
tests/table/test_replace.py:
##
@@ -0,0 +1,109 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from pyiceberg.catalog import Catalog
+from pyiceberg.manifest import DataFile, DataFileContent, FileFormat
+from pyiceberg.schema import Schema
+from pyiceberg.table.snapshots import Operation
+from pyiceberg.typedef import Record
+
+
+def test_replace_api(catalog: Catalog) -> None:
+# Setup a basic table using the catalog fixture
+catalog.create_namespace("default")
+table = catalog.create_table(
+identifier="default.test_replace",
+schema=Schema(),
+)
+
+# Create mock DataFiles for the test
+file_to_delete = DataFile.from_args(
+file_path="s3://bucket/test/data/deleted.parquet",
+file_format=FileFormat.PARQUET,
+partition=Record(),
+record_count=100,
+file_size_in_bytes=1024,
+content=DataFileContent.DATA,
+)
+file_to_delete.spec_id = 0
+
+file_to_add = DataFile.from_args(
+file_path="s3://bucket/test/data/added.parquet",
+file_format=FileFormat.PARQUET,
+partition=Record(),
+record_count=100,
+file_size_in_bytes=1024,
+content=DataFileContent.DATA,
+)
+file_to_add.spec_id = 0
+
+# Initially append to have something to replace
+with table.transaction() as tx:
+with tx.update_snapshot().fast_append() as append_snapshot:
+append_snapshot.append_data_file(file_to_delete)
+
+# Verify initial append snapshot
+assert len(table.history()) == 1
+snapshot = table.current_snapshot()
+assert snapshot is not None
+assert snapshot.summary is not None
+assert snapshot.summary["operation"] == Operation.APPEND
+
+# Call the replace API
+table.replace(files_to_delete=[file_to_delete], files_to_add=[file_to_add])
+
+# Verify the replacement created a REPLACE snapshot
+assert len(table.history()) == 2
+snapshot = table.current_snapshot()
+assert snapshot is not None
+assert snapshot.summary is not None
+assert snapshot.summary["operation"] == Operation.REPLACE
+
+# Verify the correct files are added and deleted
+# The summary property tracks these counts
+assert snapshot.summary["added-data-files"] == "1"
+assert snapshot.summary["deleted-data-files"] == "1"
+assert snapshot.summary["added-records"] == "100"
+assert snapshot.summary["deleted-records"] == "100"
+
+# Verify the new file exists in the new manifest
+manifest_files = snapshot.manifests(table.io)
+assert len(manifest_files) == 2  # One for ADDED, one for DELETED
+
+# Check that sequence numbers were handled properly natively by verifying 
the manifest contents
+entries = []
+for manifest in manifest_files:
+for entry in manifest.fetch_manifest_entry(table.io, 
discard_deleted=False):
+entries.append(entry)
+
+# One entry for ADDED (new file), one for DELETED (old file)
+assert len(entries) == 2

Review Comment:
   Hi @kevinjqliu, I've addressed this in the latest 
33aaef0817b1366d80bcd8194a0c2ca5ba6f46f2, where the status of each file is 
tested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Add metadata-only replace API to Table for REPLACE snapshot operations [iceberg-python]

2026-03-27 Thread via GitHub


qzyu999 commented on PR #3131:
URL: https://github.com/apache/iceberg-python/pull/3131#issuecomment-4146501702

   Hi @kevinjqliu, apologies for the delay, thank you so much for taking the 
time to review the PR again, I understand that you are quite busy. I've 
addressed all your points in the latest set of tests within 
33aaef0817b1366d80bcd8194a0c2ca5ba6f46f2. I've thoroughly expanded the tests to 
integrate those requirements across a broad set of tests.
   
   There are two minor issues I noticed however:
   - Requirement: If the difference is due to prior soft-deletes, confirm those 
delete files account for it
 - This would require however that the `_RewriteFiles` be scoped to handle 
Delete Manifests, but currently it's only set to handle Data Files. Handling 
Delete Manifests would make it so that we could potentially do `REPLACE` 
operations on deleted files. For example the purpose of this PR is to allow for 
compaction of data files, but we could in theory also compact delete files for 
the use case that someone has run many delete operations on many small files.
 - I think this is definitely something to work on, but perhaps not in this 
PR. The Java code seems to handle this well. I am thinking that after we merge 
this `REPLACE`, we can next work on the data compaction issue. Then after that 
we can come back to work on _RewriteFiles for Delete Manifests and work on 
metadata compaction afterwards.
   - Another more minor issue I noticed is that from running the tests and 
doing `fast_append()` on files that are `DataFileContent.POSITION_DELETES`, 
they're not yet being labeled properly as `ManifestContent.DELETES`. IIUC this 
is due to the fact that `_SnapshotProducer._manifests()` (which `fast_append` 
relies on under the hood) currently defaults to creating standard 
`ManifestContent.DATA` writers. It doesn't yet inspect the incoming file's 
content type to route `POSITION_DELETES` into a dedicated 
`ManifestContent.DELETES` writer. I worked around this in my test by scanning 
the manifest entries directly rather than relying on the manifest's label, but 
I just wanted to flag it for the roadmap for when we build out full 
Merge-on-Read write support.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Add metadata-only replace API to Table for REPLACE snapshot operations [iceberg-python]

2026-03-27 Thread via GitHub


qzyu999 commented on code in PR #3131:
URL: https://github.com/apache/iceberg-python/pull/3131#discussion_r3002985921


##
pyiceberg/table/__init__.py:
##
@@ -450,6 +450,32 @@ def update_statistics(self) -> UpdateStatistics:
 """
 return UpdateStatistics(transaction=self)
 
+def replace(

Review Comment:
   Hi @kevinjqliu, I think that logic makes sense, as it would be dangerous for 
users to use these without being able to enforce the underlying expectations of 
the values input to these functions. Removed them in 
94bd87e1d64f8ee524dd46b789e90eacbe471076



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Add metadata-only replace API to Table for REPLACE snapshot operations [iceberg-python]

2026-03-25 Thread via GitHub


kevinjqliu commented on code in PR #3131:
URL: https://github.com/apache/iceberg-python/pull/3131#discussion_r2991623781


##
pyiceberg/table/__init__.py:
##
@@ -450,6 +450,32 @@ def update_statistics(self) -> UpdateStatistics:
 """
 return UpdateStatistics(transaction=self)
 
+def replace(

Review Comment:
   we should not expose `replace` as a public function as we cannot guarantee 
that the `files_to_delete` and `files_to_add` contains the same records. 
   
   I think we should start at `_RewriteFiles` 



##
tests/table/test_replace.py:
##
@@ -0,0 +1,109 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from pyiceberg.catalog import Catalog
+from pyiceberg.manifest import DataFile, DataFileContent, FileFormat
+from pyiceberg.schema import Schema
+from pyiceberg.table.snapshots import Operation
+from pyiceberg.typedef import Record
+
+
+def test_replace_api(catalog: Catalog) -> None:
+# Setup a basic table using the catalog fixture
+catalog.create_namespace("default")
+table = catalog.create_table(
+identifier="default.test_replace",
+schema=Schema(),
+)
+
+# Create mock DataFiles for the test
+file_to_delete = DataFile.from_args(
+file_path="s3://bucket/test/data/deleted.parquet",
+file_format=FileFormat.PARQUET,
+partition=Record(),
+record_count=100,
+file_size_in_bytes=1024,
+content=DataFileContent.DATA,
+)
+file_to_delete.spec_id = 0
+
+file_to_add = DataFile.from_args(
+file_path="s3://bucket/test/data/added.parquet",
+file_format=FileFormat.PARQUET,
+partition=Record(),
+record_count=100,
+file_size_in_bytes=1024,
+content=DataFileContent.DATA,
+)
+file_to_add.spec_id = 0
+
+# Initially append to have something to replace
+with table.transaction() as tx:
+with tx.update_snapshot().fast_append() as append_snapshot:
+append_snapshot.append_data_file(file_to_delete)
+
+# Verify initial append snapshot
+assert len(table.history()) == 1
+snapshot = table.current_snapshot()
+assert snapshot is not None
+assert snapshot.summary is not None
+assert snapshot.summary["operation"] == Operation.APPEND
+
+# Call the replace API
+table.replace(files_to_delete=[file_to_delete], files_to_add=[file_to_add])
+
+# Verify the replacement created a REPLACE snapshot
+assert len(table.history()) == 2
+snapshot = table.current_snapshot()
+assert snapshot is not None
+assert snapshot.summary is not None
+assert snapshot.summary["operation"] == Operation.REPLACE
+
+# Verify the correct files are added and deleted
+# The summary property tracks these counts
+assert snapshot.summary["added-data-files"] == "1"
+assert snapshot.summary["deleted-data-files"] == "1"
+assert snapshot.summary["added-records"] == "100"
+assert snapshot.summary["deleted-records"] == "100"
+
+# Verify the new file exists in the new manifest
+manifest_files = snapshot.manifests(table.io)
+assert len(manifest_files) == 2  # One for ADDED, one for DELETED
+
+# Check that sequence numbers were handled properly natively by verifying 
the manifest contents
+entries = []
+for manifest in manifest_files:
+for entry in manifest.fetch_manifest_entry(table.io, 
discard_deleted=False):
+entries.append(entry)
+
+# One entry for ADDED (new file), one for DELETED (old file)
+assert len(entries) == 2

Review Comment:
   we need to test a bit more on the status of each file. 
   
   
https://github.com/apache/iceberg/blob/main/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java
 is a good reference



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For add