Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on PR #10935: URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2550247726 @flyrain you had indicated that you were interested in this. Can you please review? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on PR #10935: URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2496928372 @aokolnychyi I have rebased the PR on main. How can we move forward with this? If we agree that the behavior is correct, how about we at least have something that works and optimize it later? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on PR #10935: URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2481942281 @aokolnychyi I agree that we should stick to existing changelog tasks and always resolve historical deletes to produce the changelog. Have you thought of any optimizations for processing the snapshots? How can we move forward with this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
aokolnychyi commented on PR #10935: URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2430250182 Coming back to some questions above. > Do we want to resolve equality deletes and map them into data files? Or should we add a new task and output the content of equality delete files? While it would be useful to stream out ingested CDC log from Flink without applying equality deletes, we shouldn't probably target that for now. If we start outputting equality deletes directly, our changelog may not be accurate. What if we upsert a record that didn't exist? What if an equality delete applied to 10 data records? We can find reasonable behavior but let's skip this for now. Let's always resolve and assign equality deletes so that the changelog is precise. > What if a snapshot adds a new delete file for the already deleted record? That can happen both with position and equality deletes. There is one more data point here. We are about to introduce sync maintenance for position deletes. This means new delete files will include old + new deleted positions. Therefore, we must always resolve historic deletes. To sum up, I propose sticking to the existing changelog tasks and always resolving historic deletes to produce a correct changelog. I am concerned about full table scans for each incremental snapshot. I'll look into ideas mentioned above to see if we can optimize that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on PR #10935: URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2425520426 > For instance, if two position-based delete operations remove the same set of records, both of them will succeed. I was not aware of this. Is this specifically the case of two concurrent delete operations starting from the same base snapshot, both removing the same set of records, and one winning the commit race, but the other would still succeed in committing? Or is this more general, where a delete that deletes already deleted records is allowed to do so? Let's say we are in the former case, then I'd say that for the purpose of CDC, the second commit would contain no changes, so the changelog should not write anything for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
pvary commented on PR #10935: URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2420515789 > First of all, we need to discuss the expected behavior: > > * Do we want to resolve equality deletes and map them into data files? Or should we add a new task and output the content of equality delete files? I'd say we should support both options (without resolving by default?). If the table is written by Flink, then for Flink CDC streaming read the lazy solution (not resolving the equality deletes) would be enough. If there is another writer for the table which creates non Flink conform equality delete files, or the user wants a retracting CDC stream when the table was written by an upsert CDC stream, then the equality delete resolution is still needed. > * What if a snapshot adds a new delete file for the already deleted record? That can happen both with position and equality deletes. For instance, if two position-based delete operations remove the same set of records, both of them will succeed. Producing a precise CDC log would require reading all historic deletes, which may be unnecessary expensive in some cases. I'd say this should be configurable as well. I think that applying delete files is less costly, so I would stick to the theoretically correct solution, and apply previously added delete files to produce the precise CDC log -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
aokolnychyi commented on PR #10935: URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2418069515 What I wrote above is more complicated, we need to understand if that complexity will be justified. I am not sure yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
aokolnychyi commented on PR #10935: URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2418068303 I went through some of my old notes, which we should discuss. We have the following tasks right now: - `AddedRowsScanTask` (added data file + deletes that happened within the same snapshot). - `DeletedDataFileScanTask` (removed data file + deletes that applied to it before it was removed). - `DeletedRowsScanTask` (data file that was affected by a delete file (if we resolve equality deletes or we had position deletes) + historic deletes that were there before + new deletes added in this snapshot). First of all, we need to discuss the expected behavior: - Do we want to resolve equality deletes and map them into data files? Or should we add a new task and output the content of equality delete files? I'd say we should support both options (without resolving by default?). - What if a snapshot adds a new delete file for the already deleted record? That can happen both with position and equality deletes. For instance, if two position-based delete operations remove the same set of records, both of them will succeed. Producing a precise CDC log would require reading all historic deletes, which may be unnecessary expensive in some cases. I'd say this should be configurable as well. If we want to make resolving equality deletes optional, we need `AddedEqualityDeletesScanTask`. We discussed this for Flink CDC use cases. It is going to be costly at planning and query time to apply equality deletes to data files to get removed records. As long as the equality delete persists the entire row or the caller is OK with only equality columns, it should be fine to output the content of equality delete files as is. I agree with the idea of iterating snapshot by snapshot. I wonder if we can optimize it, though. **AddedRowsScanTask** We don’t have to look up historic deletes as the file was just added, except the position deletes added in the same snapshot. For each added data file, it is enough to look up matching position deletes in `DeleteFileIndex` built from delete manifests added in this snapshot. **DeletedDataFileScanTask** We have to build `DeleteFileIndex` that includes all historic deletes for removed data files. We can optimize this step by reading delete manifests only for the affected partitions. We can create a `PartitionMap` predicate from new data files and use it while reading delete manifests. We can supplement that with a predicate on `referencedDataFile` in the future. Historic delete files that are not affecting deleted data file partitions can be discarded. **DeletedRowsScanTask** For each position delete file and each equality delete file (if those must be resolved), find matching data files and historic deletes (if configured to resolve double deletes). We can still build a partition predicate from all delete files that were added in this snapshot. We can use that predicate to prune the set of data manifests for that snapshot. For each newly added delete file, output a task for each affected data file (one delete file can match multiple data files). **AddedEqualityDeletesScanTask** Output each added equality delete file as a separate task without matching them to data files (if configured). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1803801850 ## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ## @@ -63,33 +61,39 @@ protected CloseableIterable doPlanFiles( return CloseableIterable.empty(); } -Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots); +Map snapshotOrdinals = computeSnapshotOrdinals(changelogSnapshots); -Set newDataManifests = +Iterable> plans = FluentIterable.from(changelogSnapshots) -.transformAndConcat(snapshot -> snapshot.dataManifests(table().io())) -.filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId())) -.toSet(); - -ManifestGroup manifestGroup = -new ManifestGroup(table().io(), newDataManifests, ImmutableList.of()) -.specsById(table().specs()) -.caseSensitive(isCaseSensitive()) -.select(scanColumns()) -.filterData(filter()) -.filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId())) -.ignoreExisting() -.columnsToKeepStats(columnsToKeepStats()); - -if (shouldIgnoreResiduals()) { - manifestGroup = manifestGroup.ignoreResiduals(); -} - -if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) { - manifestGroup = manifestGroup.planWith(planExecutor()); -} - -return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots)); +.transform( +snapshot -> { + List dataManifests = snapshot.dataManifests(table().io()); + List deleteManifests = snapshot.deleteManifests(table().io()); + + ManifestGroup manifestGroup = + new ManifestGroup(table().io(), dataManifests, deleteManifests) Review Comment: I went snapshot by snapshot as it was the easiest to reason with. For sure we need to consider more than just newly added manifests. For each snapshot, we need to consider existing data files if there are new deletes applicable to them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
aokolnychyi commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1803692669 ## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ## @@ -63,33 +61,39 @@ protected CloseableIterable doPlanFiles( return CloseableIterable.empty(); } -Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots); +Map snapshotOrdinals = computeSnapshotOrdinals(changelogSnapshots); -Set newDataManifests = +Iterable> plans = FluentIterable.from(changelogSnapshots) -.transformAndConcat(snapshot -> snapshot.dataManifests(table().io())) -.filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId())) -.toSet(); - -ManifestGroup manifestGroup = -new ManifestGroup(table().io(), newDataManifests, ImmutableList.of()) -.specsById(table().specs()) -.caseSensitive(isCaseSensitive()) -.select(scanColumns()) -.filterData(filter()) -.filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId())) -.ignoreExisting() -.columnsToKeepStats(columnsToKeepStats()); - -if (shouldIgnoreResiduals()) { - manifestGroup = manifestGroup.ignoreResiduals(); -} - -if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) { - manifestGroup = manifestGroup.planWith(planExecutor()); -} - -return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots)); +.transform( +snapshot -> { + List dataManifests = snapshot.dataManifests(table().io()); + List deleteManifests = snapshot.deleteManifests(table().io()); + + ManifestGroup manifestGroup = + new ManifestGroup(table().io(), dataManifests, deleteManifests) Review Comment: This will cause a substantial performance hit as we will scan all data and delete manifests that match the filter for each changelog snapshot, instead of opening only newly added manifests before. We may have to do that for deletes anyway, but I wonder about data manifests. Let me think a bit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on PR #10935: URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2417568428 @flyrain and @aokolnychyi as you have both expressed interest to review this, I hope you can find time to do so soon! Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
aokolnychyi commented on PR #10935: URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2407845436 Will start looking into this PR today and should be able to finish over the weekend. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on PR #10935: URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2374326949 @flyrain you worked on this area and implemented some of the changelog support; can you please review? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
pvary commented on PR #10935: URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2373258387 @RussellSpitzer, @aokolnychyi: would it be possible to take a look at this PR? I did my best to review it, but I'm not an expert in this part of the code. Thanks, Peter -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on PR #10935: URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2372741865 @pvary can you please help move this forward then? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
pvary commented on PR #10935: URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2367649336 This looks good to me. @dramaticlly: Any more comments, before we try to involve the guys who are more experienced with the core/spark parts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
pvary commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1771032932 ## core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java: ## @@ -132,6 +131,175 @@ public void testFileDeletes() { assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty(); } + @TestTemplate + public void testRowDeletes() { +assumeThat(formatVersion).isEqualTo(2); + +table +.newFastAppend() +.appendFile(FILE_A) +.appendFile(FILE_A2) +.appendFile(FILE_B) +.appendFile(FILE_C) +.commit(); +Snapshot snap1 = table.currentSnapshot(); + +// position delete +table.newRowDelta().addDeletes(FILE_B_DELETES).commit(); +Snapshot snap2 = table.currentSnapshot(); + +// equality delete +table.newRowDelta().addDeletes(FILE_C2_DELETES).commit(); +Snapshot snap3 = table.currentSnapshot(); + +// mix of position and equality deletes + table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit(); Review Comment: Cool! Thanks for the explanation! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
pvary commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1771031788 ## core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java: ## @@ -132,6 +131,139 @@ public void testFileDeletes() { assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty(); } + @TestTemplate + public void testPositionDeletes() { Review Comment: Thanks! I like your new tests! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1767624705 ## core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java: ## @@ -132,6 +131,175 @@ public void testFileDeletes() { assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty(); } + @TestTemplate + public void testRowDeletes() { +assumeThat(formatVersion).isEqualTo(2); + +table +.newFastAppend() +.appendFile(FILE_A) +.appendFile(FILE_A2) +.appendFile(FILE_B) +.appendFile(FILE_C) +.commit(); +Snapshot snap1 = table.currentSnapshot(); + +// position delete +table.newRowDelta().addDeletes(FILE_B_DELETES).commit(); +Snapshot snap2 = table.currentSnapshot(); + +// equality delete +table.newRowDelta().addDeletes(FILE_C2_DELETES).commit(); +Snapshot snap3 = table.currentSnapshot(); + +// mix of position and equality deletes + table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit(); Review Comment: Ah, you mean compaction as in rewriting data files. Compaction does not result in any inserts or deletes. (I believe this is what we both expect.) Please see `testDataRewritesAreIgnored` that I added in `TestChangelogTable` in this PR. This test is in spark-extensions because I want to call the rewrite_data_files Spark procedure. (Note that in that test, there is a snapshot between `snap2` and `snap3`, namely the snapshot where the rewrite_data_files is committed, but I don't capture it, because no change for it shows up in the changelog (which is the point). In `snap3`, another delete occurs, and that shows up in the changelog.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1767624705 ## core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java: ## @@ -132,6 +131,175 @@ public void testFileDeletes() { assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty(); } + @TestTemplate + public void testRowDeletes() { +assumeThat(formatVersion).isEqualTo(2); + +table +.newFastAppend() +.appendFile(FILE_A) +.appendFile(FILE_A2) +.appendFile(FILE_B) +.appendFile(FILE_C) +.commit(); +Snapshot snap1 = table.currentSnapshot(); + +// position delete +table.newRowDelta().addDeletes(FILE_B_DELETES).commit(); +Snapshot snap2 = table.currentSnapshot(); + +// equality delete +table.newRowDelta().addDeletes(FILE_C2_DELETES).commit(); +Snapshot snap3 = table.currentSnapshot(); + +// mix of position and equality deletes + table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit(); Review Comment: Ah, you mean compaction as in rewriting data files. Compaction does not result in any inserts or deletes as far as the changelog is concerned. (I believe this is what we both expect.) Please see `testDataRewritesAreIgnored` that I added in `TestChangelogTable` in this PR. This test is in spark-extensions because I want to call the rewrite_data_files Spark procedure. (Note that in that test, there is a snapshot between `snap2` and `snap3`, namely the snapshot where the rewrite_data_files is committed, but I don't capture it, because no change for it shows up in the changelog (which is the point). In `snap3`, another delete occurs, and that shows up in the changelog.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1767624705 ## core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java: ## @@ -132,6 +131,175 @@ public void testFileDeletes() { assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty(); } + @TestTemplate + public void testRowDeletes() { +assumeThat(formatVersion).isEqualTo(2); + +table +.newFastAppend() +.appendFile(FILE_A) +.appendFile(FILE_A2) +.appendFile(FILE_B) +.appendFile(FILE_C) +.commit(); +Snapshot snap1 = table.currentSnapshot(); + +// position delete +table.newRowDelta().addDeletes(FILE_B_DELETES).commit(); +Snapshot snap2 = table.currentSnapshot(); + +// equality delete +table.newRowDelta().addDeletes(FILE_C2_DELETES).commit(); +Snapshot snap3 = table.currentSnapshot(); + +// mix of position and equality deletes + table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit(); Review Comment: Ah, you mean compaction as in rewriting data files. In this case, compaction does not result in any inserts or deletes. (I believe this is what we both expect.) Please see `testDataRewritesAreIgnored` that I added in `TestChangelogTable` in this PR. This test is in spark-extensions because I want to call the rewrite_data_files Spark procedure. (Note that in that test, there is a snapshot between `snap2` and `snap3`, namely the snapshot where the rewrite_data_files is committed, but I don't capture it, because no change for it shows up in the changelog (which is the point). In `snap3`, another delete occurs, and that shows up in the changelog.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1767624705 ## core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java: ## @@ -132,6 +131,175 @@ public void testFileDeletes() { assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty(); } + @TestTemplate + public void testRowDeletes() { +assumeThat(formatVersion).isEqualTo(2); + +table +.newFastAppend() +.appendFile(FILE_A) +.appendFile(FILE_A2) +.appendFile(FILE_B) +.appendFile(FILE_C) +.commit(); +Snapshot snap1 = table.currentSnapshot(); + +// position delete +table.newRowDelta().addDeletes(FILE_B_DELETES).commit(); +Snapshot snap2 = table.currentSnapshot(); + +// equality delete +table.newRowDelta().addDeletes(FILE_C2_DELETES).commit(); +Snapshot snap3 = table.currentSnapshot(); + +// mix of position and equality deletes + table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit(); Review Comment: Ah, you mean compaction as in rewriting data files. In this case, compaction does not result in any inserts or deletes. (I believe this is what we both expect.) Please see `testDataRewritesAreIgnored` that I added in `TestChangelogTable` in this PR. This test is in spark-extensions because I want to call the rewrite_data_files Spark procedure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
pvary commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1767544018 ## core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java: ## @@ -132,6 +131,175 @@ public void testFileDeletes() { assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty(); } + @TestTemplate + public void testRowDeletes() { +assumeThat(formatVersion).isEqualTo(2); + +table +.newFastAppend() +.appendFile(FILE_A) +.appendFile(FILE_A2) +.appendFile(FILE_B) +.appendFile(FILE_C) +.commit(); +Snapshot snap1 = table.currentSnapshot(); + +// position delete +table.newRowDelta().addDeletes(FILE_B_DELETES).commit(); +Snapshot snap2 = table.currentSnapshot(); + +// equality delete +table.newRowDelta().addDeletes(FILE_C2_DELETES).commit(); +Snapshot snap3 = table.currentSnapshot(); + +// mix of position and equality deletes + table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit(); Review Comment: This is the question about how you see the compaction where we rewrite the files with the expressed goal to keep the actual content unchanged. - Is this a change where we remove and add files which are coincidentally contain the same data? - Is this a change where no data has changed? I see the reasoning behind both cases, but using the first approach would most probably make the changelog scan unfesable for cases where compaction happens on the table. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1767321018 ## core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java: ## @@ -132,6 +131,175 @@ public void testFileDeletes() { assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty(); } + @TestTemplate + public void testRowDeletes() { +assumeThat(formatVersion).isEqualTo(2); + +table +.newFastAppend() +.appendFile(FILE_A) +.appendFile(FILE_A2) +.appendFile(FILE_B) +.appendFile(FILE_C) +.commit(); +Snapshot snap1 = table.currentSnapshot(); + +// position delete +table.newRowDelta().addDeletes(FILE_B_DELETES).commit(); +Snapshot snap2 = table.currentSnapshot(); + +// equality delete +table.newRowDelta().addDeletes(FILE_C2_DELETES).commit(); +Snapshot snap3 = table.currentSnapshot(); + +// mix of position and equality deletes + table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit(); Review Comment: ps. Net changes is something a user might want regardless of whether merge-on-read or copy-on-write is used, so is an orthogonal concern. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1767319143 ## core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java: ## @@ -132,6 +131,175 @@ public void testFileDeletes() { assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty(); } + @TestTemplate + public void testRowDeletes() { +assumeThat(formatVersion).isEqualTo(2); + +table +.newFastAppend() +.appendFile(FILE_A) +.appendFile(FILE_A2) +.appendFile(FILE_B) +.appendFile(FILE_C) +.commit(); +Snapshot snap1 = table.currentSnapshot(); + +// position delete +table.newRowDelta().addDeletes(FILE_B_DELETES).commit(); +Snapshot snap2 = table.currentSnapshot(); + +// equality delete +table.newRowDelta().addDeletes(FILE_C2_DELETES).commit(); +Snapshot snap3 = table.currentSnapshot(); + +// mix of position and equality deletes + table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit(); Review Comment: I'm not sure I understand your question. Are you referring to outputting *net changes*? If so, I think that should be a separate PR. I think for that, we should introduce an option for it. If you're referring to something else, can you please explain? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1767295969 ## core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java: ## @@ -132,6 +131,175 @@ public void testFileDeletes() { assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty(); } + @TestTemplate + public void testRowDeletes() { +assumeThat(formatVersion).isEqualTo(2); + +table +.newFastAppend() +.appendFile(FILE_A) +.appendFile(FILE_A2) +.appendFile(FILE_B) +.appendFile(FILE_C) +.commit(); +Snapshot snap1 = table.currentSnapshot(); + +// position delete +table.newRowDelta().addDeletes(FILE_B_DELETES).commit(); +Snapshot snap2 = table.currentSnapshot(); + +// equality delete +table.newRowDelta().addDeletes(FILE_C2_DELETES).commit(); +Snapshot snap3 = table.currentSnapshot(); + +// mix of position and equality deletes + table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit(); +Snapshot snap4 = table.currentSnapshot(); + +IncrementalChangelogScan scan = + newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap4.snapshotId()); + +List tasks = plan(scan); + +assertThat(tasks).as("Must have 4 tasks").hasSize(4); + +DeletedRowsScanTask t1 = (DeletedRowsScanTask) tasks.get(0); +assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0); +assertThat(t1.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap2.snapshotId()); +assertThat(t1.file().path()).as("Data file must match").isEqualTo(FILE_B.path()); +assertThat(t1.addedDeletes().get(0).path()) +.as("Added delete file must match") +.isEqualTo(FILE_B_DELETES.path()); +assertThat(t1.existingDeletes()).as("Must be no existing deletes").isEmpty(); + +DeletedRowsScanTask t2 = (DeletedRowsScanTask) tasks.get(1); +assertThat(t2.changeOrdinal()).as("Ordinal must match").isEqualTo(1); +assertThat(t2.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap3.snapshotId()); +assertThat(t2.file().path()).as("Data file must match").isEqualTo(FILE_C.path()); +assertThat(t2.addedDeletes().get(0).path()) +.as("Added delete file must match") +.isEqualTo(FILE_C2_DELETES.path()); +assertThat(t2.existingDeletes()).as("Must be no existing deletes").isEmpty(); + +DeletedRowsScanTask t3 = (DeletedRowsScanTask) tasks.get(2); +assertThat(t3.changeOrdinal()).as("Ordinal must match").isEqualTo(2); +assertThat(t3.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap4.snapshotId()); +assertThat(t3.file().path()).as("Data file must match").isEqualTo(FILE_A2.path()); +assertThat(t3.addedDeletes().size()).as("Number of added delete files must match").isEqualTo(2); +assertThat(t3.addedDeletes().get(0).path()) +.as("Added delete file must match") +.isEqualTo(FILE_A2_DELETES.path()); +assertThat(t3.addedDeletes().get(1).path()) +.as("Added delete file must match") +.isEqualTo(FILE_A_DELETES.path()); Review Comment: The test is not flaky, but you have a good point. We need not depend on the order. We can follow a variant of @dramaticlly's suggestion below: ``` assertThat(t3.addedDeletes()) .hasSize(2) .extracting(DeleteFile::path) .as("Added delete file must match") .containsExactlyInAnyOrder( FILE_A2_DELETES.path(), FILE_A_DELETES.path()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1767285190 ## core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java: ## @@ -132,6 +131,175 @@ public void testFileDeletes() { assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty(); } + @TestTemplate + public void testRowDeletes() { +assumeThat(formatVersion).isEqualTo(2); + +table +.newFastAppend() +.appendFile(FILE_A) +.appendFile(FILE_A2) +.appendFile(FILE_B) +.appendFile(FILE_C) +.commit(); +Snapshot snap1 = table.currentSnapshot(); + +// position delete +table.newRowDelta().addDeletes(FILE_B_DELETES).commit(); +Snapshot snap2 = table.currentSnapshot(); + +// equality delete +table.newRowDelta().addDeletes(FILE_C2_DELETES).commit(); +Snapshot snap3 = table.currentSnapshot(); + +// mix of position and equality deletes + table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit(); +Snapshot snap4 = table.currentSnapshot(); + +IncrementalChangelogScan scan = + newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap4.snapshotId()); + +List tasks = plan(scan); + +assertThat(tasks).as("Must have 4 tasks").hasSize(4); + +DeletedRowsScanTask t1 = (DeletedRowsScanTask) tasks.get(0); +assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0); +assertThat(t1.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap2.snapshotId()); +assertThat(t1.file().path()).as("Data file must match").isEqualTo(FILE_B.path()); +assertThat(t1.addedDeletes().get(0).path()) +.as("Added delete file must match") +.isEqualTo(FILE_B_DELETES.path()); +assertThat(t1.existingDeletes()).as("Must be no existing deletes").isEmpty(); + +DeletedRowsScanTask t2 = (DeletedRowsScanTask) tasks.get(1); +assertThat(t2.changeOrdinal()).as("Ordinal must match").isEqualTo(1); +assertThat(t2.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap3.snapshotId()); +assertThat(t2.file().path()).as("Data file must match").isEqualTo(FILE_C.path()); +assertThat(t2.addedDeletes().get(0).path()) +.as("Added delete file must match") +.isEqualTo(FILE_C2_DELETES.path()); +assertThat(t2.existingDeletes()).as("Must be no existing deletes").isEmpty(); + +DeletedRowsScanTask t3 = (DeletedRowsScanTask) tasks.get(2); +assertThat(t3.changeOrdinal()).as("Ordinal must match").isEqualTo(2); +assertThat(t3.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap4.snapshotId()); +assertThat(t3.file().path()).as("Data file must match").isEqualTo(FILE_A2.path()); +assertThat(t3.addedDeletes().size()).as("Number of added delete files must match").isEqualTo(2); +assertThat(t3.addedDeletes().get(0).path()) +.as("Added delete file must match") +.isEqualTo(FILE_A2_DELETES.path()); +assertThat(t3.addedDeletes().get(1).path()) +.as("Added delete file must match") +.isEqualTo(FILE_A_DELETES.path()); +assertThat(t3.existingDeletes()).as("Must be no existing deletes").isEmpty(); + +DeletedRowsScanTask t4 = (DeletedRowsScanTask) tasks.get(3); +assertThat(t4.changeOrdinal()).as("Ordinal must match").isEqualTo(2); +assertThat(t4.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap4.snapshotId()); +assertThat(t4.file().path()).as("Data file must match").isEqualTo(FILE_A.path()); +assertThat(t4.addedDeletes().size()).as("Number of added delete files must match").isEqualTo(2); +assertThat(t4.addedDeletes().get(0).path()) +.as("Added delete file must match") +.isEqualTo(FILE_A2_DELETES.path()); +assertThat(t4.addedDeletes().get(1).path()) +.as("Added delete file must match") +.isEqualTo(FILE_A_DELETES.path()); +assertThat(t4.existingDeletes()).as("Must be no existing deletes").isEmpty(); Review Comment: That sounds good. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
dramaticlly commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1767264496 ## core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java: ## @@ -132,6 +131,175 @@ public void testFileDeletes() { assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty(); } + @TestTemplate + public void testRowDeletes() { +assumeThat(formatVersion).isEqualTo(2); + +table +.newFastAppend() +.appendFile(FILE_A) +.appendFile(FILE_A2) +.appendFile(FILE_B) +.appendFile(FILE_C) +.commit(); +Snapshot snap1 = table.currentSnapshot(); + +// position delete +table.newRowDelta().addDeletes(FILE_B_DELETES).commit(); +Snapshot snap2 = table.currentSnapshot(); + +// equality delete +table.newRowDelta().addDeletes(FILE_C2_DELETES).commit(); +Snapshot snap3 = table.currentSnapshot(); + +// mix of position and equality deletes + table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit(); +Snapshot snap4 = table.currentSnapshot(); + +IncrementalChangelogScan scan = + newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap4.snapshotId()); + +List tasks = plan(scan); + +assertThat(tasks).as("Must have 4 tasks").hasSize(4); + +DeletedRowsScanTask t1 = (DeletedRowsScanTask) tasks.get(0); +assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0); +assertThat(t1.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap2.snapshotId()); +assertThat(t1.file().path()).as("Data file must match").isEqualTo(FILE_B.path()); +assertThat(t1.addedDeletes().get(0).path()) +.as("Added delete file must match") +.isEqualTo(FILE_B_DELETES.path()); +assertThat(t1.existingDeletes()).as("Must be no existing deletes").isEmpty(); + +DeletedRowsScanTask t2 = (DeletedRowsScanTask) tasks.get(1); +assertThat(t2.changeOrdinal()).as("Ordinal must match").isEqualTo(1); +assertThat(t2.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap3.snapshotId()); +assertThat(t2.file().path()).as("Data file must match").isEqualTo(FILE_C.path()); +assertThat(t2.addedDeletes().get(0).path()) +.as("Added delete file must match") +.isEqualTo(FILE_C2_DELETES.path()); +assertThat(t2.existingDeletes()).as("Must be no existing deletes").isEmpty(); + +DeletedRowsScanTask t3 = (DeletedRowsScanTask) tasks.get(2); +assertThat(t3.changeOrdinal()).as("Ordinal must match").isEqualTo(2); +assertThat(t3.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap4.snapshotId()); +assertThat(t3.file().path()).as("Data file must match").isEqualTo(FILE_A2.path()); +assertThat(t3.addedDeletes().size()).as("Number of added delete files must match").isEqualTo(2); +assertThat(t3.addedDeletes().get(0).path()) +.as("Added delete file must match") +.isEqualTo(FILE_A2_DELETES.path()); +assertThat(t3.addedDeletes().get(1).path()) +.as("Added delete file must match") +.isEqualTo(FILE_A_DELETES.path()); Review Comment: if I understand this correctly, the test result seem to be consistent here as when build DeleteFileIndex, we always concat equality delete (A2_DELETES) before position delete (A_DELETES) for given data file entry in https://github.com/apache/iceberg/blob/9dbfbbb203c007592b8ba5f4816925043c08923a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java#L139 and array ordering seem to be maintained through out the code. IMO, this is not necessarily something we need to assert the ordering on -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
dramaticlly commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1767226037 ## core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java: ## @@ -132,6 +131,175 @@ public void testFileDeletes() { assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty(); } + @TestTemplate + public void testRowDeletes() { +assumeThat(formatVersion).isEqualTo(2); + +table +.newFastAppend() +.appendFile(FILE_A) +.appendFile(FILE_A2) +.appendFile(FILE_B) +.appendFile(FILE_C) +.commit(); +Snapshot snap1 = table.currentSnapshot(); + +// position delete +table.newRowDelta().addDeletes(FILE_B_DELETES).commit(); +Snapshot snap2 = table.currentSnapshot(); + +// equality delete +table.newRowDelta().addDeletes(FILE_C2_DELETES).commit(); +Snapshot snap3 = table.currentSnapshot(); + +// mix of position and equality deletes + table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit(); +Snapshot snap4 = table.currentSnapshot(); + +IncrementalChangelogScan scan = + newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap4.snapshotId()); + +List tasks = plan(scan); + +assertThat(tasks).as("Must have 4 tasks").hasSize(4); + +DeletedRowsScanTask t1 = (DeletedRowsScanTask) tasks.get(0); +assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0); +assertThat(t1.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap2.snapshotId()); +assertThat(t1.file().path()).as("Data file must match").isEqualTo(FILE_B.path()); +assertThat(t1.addedDeletes().get(0).path()) +.as("Added delete file must match") +.isEqualTo(FILE_B_DELETES.path()); +assertThat(t1.existingDeletes()).as("Must be no existing deletes").isEmpty(); + +DeletedRowsScanTask t2 = (DeletedRowsScanTask) tasks.get(1); +assertThat(t2.changeOrdinal()).as("Ordinal must match").isEqualTo(1); +assertThat(t2.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap3.snapshotId()); +assertThat(t2.file().path()).as("Data file must match").isEqualTo(FILE_C.path()); +assertThat(t2.addedDeletes().get(0).path()) +.as("Added delete file must match") +.isEqualTo(FILE_C2_DELETES.path()); +assertThat(t2.existingDeletes()).as("Must be no existing deletes").isEmpty(); + +DeletedRowsScanTask t3 = (DeletedRowsScanTask) tasks.get(2); +assertThat(t3.changeOrdinal()).as("Ordinal must match").isEqualTo(2); +assertThat(t3.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap4.snapshotId()); +assertThat(t3.file().path()).as("Data file must match").isEqualTo(FILE_A2.path()); +assertThat(t3.addedDeletes().size()).as("Number of added delete files must match").isEqualTo(2); +assertThat(t3.addedDeletes().get(0).path()) +.as("Added delete file must match") +.isEqualTo(FILE_A2_DELETES.path()); +assertThat(t3.addedDeletes().get(1).path()) +.as("Added delete file must match") +.isEqualTo(FILE_A_DELETES.path()); +assertThat(t3.existingDeletes()).as("Must be no existing deletes").isEmpty(); + +DeletedRowsScanTask t4 = (DeletedRowsScanTask) tasks.get(3); +assertThat(t4.changeOrdinal()).as("Ordinal must match").isEqualTo(2); +assertThat(t4.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap4.snapshotId()); +assertThat(t4.file().path()).as("Data file must match").isEqualTo(FILE_A.path()); +assertThat(t4.addedDeletes().size()).as("Number of added delete files must match").isEqualTo(2); +assertThat(t4.addedDeletes().get(0).path()) +.as("Added delete file must match") +.isEqualTo(FILE_A2_DELETES.path()); +assertThat(t4.addedDeletes().get(1).path()) +.as("Added delete file must match") +.isEqualTo(FILE_A_DELETES.path()); +assertThat(t4.existingDeletes()).as("Must be no existing deletes").isEmpty(); Review Comment: In that sense I believe we can probably simplify assertion on addedDeletes to be this (with code reduction) ```java assertThat(t3.addedDeletes()) .hasSize(2) .extracting(DeleteFile::path) .as("Added delete file must match") .containsExactly( FILE_A2_DELETES.path(), FILE_A_DELETES.path()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
pvary commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1766615044 ## core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java: ## @@ -132,6 +131,175 @@ public void testFileDeletes() { assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty(); } + @TestTemplate + public void testRowDeletes() { +assumeThat(formatVersion).isEqualTo(2); + +table +.newFastAppend() +.appendFile(FILE_A) +.appendFile(FILE_A2) +.appendFile(FILE_B) +.appendFile(FILE_C) +.commit(); +Snapshot snap1 = table.currentSnapshot(); + +// position delete +table.newRowDelta().addDeletes(FILE_B_DELETES).commit(); +Snapshot snap2 = table.currentSnapshot(); + +// equality delete +table.newRowDelta().addDeletes(FILE_C2_DELETES).commit(); +Snapshot snap3 = table.currentSnapshot(); + +// mix of position and equality deletes + table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit(); Review Comment: One more interesting question is: - How we will handle the compaction commits? Is there a better way than emit every insert/delete record? Could we skip it? Is it trivial to fix in this PR, or is that need another PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
pvary commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1766608610 ## core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java: ## @@ -132,6 +131,175 @@ public void testFileDeletes() { assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty(); } + @TestTemplate + public void testRowDeletes() { +assumeThat(formatVersion).isEqualTo(2); + +table +.newFastAppend() +.appendFile(FILE_A) +.appendFile(FILE_A2) +.appendFile(FILE_B) +.appendFile(FILE_C) +.commit(); +Snapshot snap1 = table.currentSnapshot(); + +// position delete +table.newRowDelta().addDeletes(FILE_B_DELETES).commit(); +Snapshot snap2 = table.currentSnapshot(); + +// equality delete +table.newRowDelta().addDeletes(FILE_C2_DELETES).commit(); +Snapshot snap3 = table.currentSnapshot(); + +// mix of position and equality deletes + table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit(); +Snapshot snap4 = table.currentSnapshot(); + +IncrementalChangelogScan scan = + newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap4.snapshotId()); + +List tasks = plan(scan); + +assertThat(tasks).as("Must have 4 tasks").hasSize(4); + +DeletedRowsScanTask t1 = (DeletedRowsScanTask) tasks.get(0); +assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0); +assertThat(t1.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap2.snapshotId()); +assertThat(t1.file().path()).as("Data file must match").isEqualTo(FILE_B.path()); +assertThat(t1.addedDeletes().get(0).path()) +.as("Added delete file must match") +.isEqualTo(FILE_B_DELETES.path()); +assertThat(t1.existingDeletes()).as("Must be no existing deletes").isEmpty(); + +DeletedRowsScanTask t2 = (DeletedRowsScanTask) tasks.get(1); +assertThat(t2.changeOrdinal()).as("Ordinal must match").isEqualTo(1); +assertThat(t2.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap3.snapshotId()); +assertThat(t2.file().path()).as("Data file must match").isEqualTo(FILE_C.path()); +assertThat(t2.addedDeletes().get(0).path()) +.as("Added delete file must match") +.isEqualTo(FILE_C2_DELETES.path()); +assertThat(t2.existingDeletes()).as("Must be no existing deletes").isEmpty(); + +DeletedRowsScanTask t3 = (DeletedRowsScanTask) tasks.get(2); +assertThat(t3.changeOrdinal()).as("Ordinal must match").isEqualTo(2); +assertThat(t3.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap4.snapshotId()); +assertThat(t3.file().path()).as("Data file must match").isEqualTo(FILE_A2.path()); +assertThat(t3.addedDeletes().size()).as("Number of added delete files must match").isEqualTo(2); +assertThat(t3.addedDeletes().get(0).path()) +.as("Added delete file must match") +.isEqualTo(FILE_A2_DELETES.path()); +assertThat(t3.addedDeletes().get(1).path()) +.as("Added delete file must match") +.isEqualTo(FILE_A_DELETES.path()); Review Comment: would this be flaky? Is there a guarantee for the order of the files? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1765955792 ## core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java: ## @@ -132,6 +131,175 @@ public void testFileDeletes() { assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty(); } + @TestTemplate + public void testRowDeletes() { +assumeThat(formatVersion).isEqualTo(2); + +table +.newFastAppend() +.appendFile(FILE_A) +.appendFile(FILE_A2) +.appendFile(FILE_B) +.appendFile(FILE_C) +.commit(); +Snapshot snap1 = table.currentSnapshot(); + +// position delete +table.newRowDelta().addDeletes(FILE_B_DELETES).commit(); +Snapshot snap2 = table.currentSnapshot(); + +// equality delete +table.newRowDelta().addDeletes(FILE_C2_DELETES).commit(); +Snapshot snap3 = table.currentSnapshot(); + +// mix of position and equality deletes + table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit(); +Snapshot snap4 = table.currentSnapshot(); + +IncrementalChangelogScan scan = + newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap4.snapshotId()); + +List tasks = plan(scan); + +assertThat(tasks).as("Must have 4 tasks").hasSize(4); + +DeletedRowsScanTask t1 = (DeletedRowsScanTask) tasks.get(0); +assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0); +assertThat(t1.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap2.snapshotId()); +assertThat(t1.file().path()).as("Data file must match").isEqualTo(FILE_B.path()); +assertThat(t1.addedDeletes().get(0).path()) +.as("Added delete file must match") +.isEqualTo(FILE_B_DELETES.path()); +assertThat(t1.existingDeletes()).as("Must be no existing deletes").isEmpty(); + +DeletedRowsScanTask t2 = (DeletedRowsScanTask) tasks.get(1); +assertThat(t2.changeOrdinal()).as("Ordinal must match").isEqualTo(1); +assertThat(t2.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap3.snapshotId()); +assertThat(t2.file().path()).as("Data file must match").isEqualTo(FILE_C.path()); +assertThat(t2.addedDeletes().get(0).path()) +.as("Added delete file must match") +.isEqualTo(FILE_C2_DELETES.path()); +assertThat(t2.existingDeletes()).as("Must be no existing deletes").isEmpty(); + +DeletedRowsScanTask t3 = (DeletedRowsScanTask) tasks.get(2); +assertThat(t3.changeOrdinal()).as("Ordinal must match").isEqualTo(2); +assertThat(t3.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap4.snapshotId()); +assertThat(t3.file().path()).as("Data file must match").isEqualTo(FILE_A2.path()); +assertThat(t3.addedDeletes().size()).as("Number of added delete files must match").isEqualTo(2); +assertThat(t3.addedDeletes().get(0).path()) +.as("Added delete file must match") +.isEqualTo(FILE_A2_DELETES.path()); +assertThat(t3.addedDeletes().get(1).path()) +.as("Added delete file must match") +.isEqualTo(FILE_A_DELETES.path()); +assertThat(t3.existingDeletes()).as("Must be no existing deletes").isEmpty(); + +DeletedRowsScanTask t4 = (DeletedRowsScanTask) tasks.get(3); +assertThat(t4.changeOrdinal()).as("Ordinal must match").isEqualTo(2); +assertThat(t4.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap4.snapshotId()); +assertThat(t4.file().path()).as("Data file must match").isEqualTo(FILE_A.path()); +assertThat(t4.addedDeletes().size()).as("Number of added delete files must match").isEqualTo(2); +assertThat(t4.addedDeletes().get(0).path()) +.as("Added delete file must match") +.isEqualTo(FILE_A2_DELETES.path()); +assertThat(t4.addedDeletes().get(1).path()) +.as("Added delete file must match") +.isEqualTo(FILE_A_DELETES.path()); +assertThat(t4.existingDeletes()).as("Must be no existing deletes").isEmpty(); Review Comment: Thanks for thinking about this and the idea. I learned something about AssertJ from you! As I see it, however, the example you show does not actually result in reduction in code, just that it is organized differently. Also, as far as using the `RecursiveComparisonConfiguration` goes, that part really does not reduce code at all. Conceptually, we want to check what the deletes/addedDeletes/existingDeletes are, and it is enough to check that the file path matches with the path of the `DeleteFile`s we expect. In other words, ``` assertThat(t4.addedDeletes().get(0).path()) .isEqualTo(FILE_A2_DELETES.path()); ``` is just as good as ``` assertThat(t4.addedDeletes().get(0)) .usingRecursiveComparison(FILE_COMPARISON_CONFIG) .isEqualTo(FILE_A2_DELETES); ``` for our purposes, and is slightly shorter. -- This is an automated message from the Apache Git Service. T
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1765955792 ## core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java: ## @@ -132,6 +131,175 @@ public void testFileDeletes() { assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty(); } + @TestTemplate + public void testRowDeletes() { +assumeThat(formatVersion).isEqualTo(2); + +table +.newFastAppend() +.appendFile(FILE_A) +.appendFile(FILE_A2) +.appendFile(FILE_B) +.appendFile(FILE_C) +.commit(); +Snapshot snap1 = table.currentSnapshot(); + +// position delete +table.newRowDelta().addDeletes(FILE_B_DELETES).commit(); +Snapshot snap2 = table.currentSnapshot(); + +// equality delete +table.newRowDelta().addDeletes(FILE_C2_DELETES).commit(); +Snapshot snap3 = table.currentSnapshot(); + +// mix of position and equality deletes + table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit(); +Snapshot snap4 = table.currentSnapshot(); + +IncrementalChangelogScan scan = + newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap4.snapshotId()); + +List tasks = plan(scan); + +assertThat(tasks).as("Must have 4 tasks").hasSize(4); + +DeletedRowsScanTask t1 = (DeletedRowsScanTask) tasks.get(0); +assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0); +assertThat(t1.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap2.snapshotId()); +assertThat(t1.file().path()).as("Data file must match").isEqualTo(FILE_B.path()); +assertThat(t1.addedDeletes().get(0).path()) +.as("Added delete file must match") +.isEqualTo(FILE_B_DELETES.path()); +assertThat(t1.existingDeletes()).as("Must be no existing deletes").isEmpty(); + +DeletedRowsScanTask t2 = (DeletedRowsScanTask) tasks.get(1); +assertThat(t2.changeOrdinal()).as("Ordinal must match").isEqualTo(1); +assertThat(t2.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap3.snapshotId()); +assertThat(t2.file().path()).as("Data file must match").isEqualTo(FILE_C.path()); +assertThat(t2.addedDeletes().get(0).path()) +.as("Added delete file must match") +.isEqualTo(FILE_C2_DELETES.path()); +assertThat(t2.existingDeletes()).as("Must be no existing deletes").isEmpty(); + +DeletedRowsScanTask t3 = (DeletedRowsScanTask) tasks.get(2); +assertThat(t3.changeOrdinal()).as("Ordinal must match").isEqualTo(2); +assertThat(t3.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap4.snapshotId()); +assertThat(t3.file().path()).as("Data file must match").isEqualTo(FILE_A2.path()); +assertThat(t3.addedDeletes().size()).as("Number of added delete files must match").isEqualTo(2); +assertThat(t3.addedDeletes().get(0).path()) +.as("Added delete file must match") +.isEqualTo(FILE_A2_DELETES.path()); +assertThat(t3.addedDeletes().get(1).path()) +.as("Added delete file must match") +.isEqualTo(FILE_A_DELETES.path()); +assertThat(t3.existingDeletes()).as("Must be no existing deletes").isEmpty(); + +DeletedRowsScanTask t4 = (DeletedRowsScanTask) tasks.get(3); +assertThat(t4.changeOrdinal()).as("Ordinal must match").isEqualTo(2); +assertThat(t4.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap4.snapshotId()); +assertThat(t4.file().path()).as("Data file must match").isEqualTo(FILE_A.path()); +assertThat(t4.addedDeletes().size()).as("Number of added delete files must match").isEqualTo(2); +assertThat(t4.addedDeletes().get(0).path()) +.as("Added delete file must match") +.isEqualTo(FILE_A2_DELETES.path()); +assertThat(t4.addedDeletes().get(1).path()) +.as("Added delete file must match") +.isEqualTo(FILE_A_DELETES.path()); +assertThat(t4.existingDeletes()).as("Must be no existing deletes").isEmpty(); Review Comment: Thanks for thinking about this and the idea. I learned something about AssertJ from you! As I see it, however, the example you show does not actually result in reduction in code, just that it is organized differently. Also, as far as using the `RecursiveComparisonConfiguration` goes, that part really does not reduce code at all. Conceptually, we want to check what the deletes/addedDeletes/existingDeletes are, and it is enough to check that the file path matches with the path of the `DeleteFile`s we expect. In other words, ``` assertThat(t4.addedDeletes().get(0).path()) .isEqualTo(FILE_A2_DELETES.path()); ``` is just as good as ``` assertThat(t4.addedDeletes().get(0)) .usingRecursiveComparison(FILE_COMPARISON_CONFIG) .isEqualTo(FILE_A2_DELETES); ``` and is slightly shorter. -- This is an automated message from the Apache Git Service. To respond to the m
Re: [PR] Support changelog scan for table with delete files [iceberg]
dramaticlly commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1765491165 ## core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java: ## @@ -132,6 +131,175 @@ public void testFileDeletes() { assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty(); } + @TestTemplate + public void testRowDeletes() { +assumeThat(formatVersion).isEqualTo(2); + +table +.newFastAppend() +.appendFile(FILE_A) +.appendFile(FILE_A2) +.appendFile(FILE_B) +.appendFile(FILE_C) +.commit(); +Snapshot snap1 = table.currentSnapshot(); + +// position delete +table.newRowDelta().addDeletes(FILE_B_DELETES).commit(); +Snapshot snap2 = table.currentSnapshot(); + +// equality delete +table.newRowDelta().addDeletes(FILE_C2_DELETES).commit(); +Snapshot snap3 = table.currentSnapshot(); + +// mix of position and equality deletes + table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit(); +Snapshot snap4 = table.currentSnapshot(); + +IncrementalChangelogScan scan = + newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap4.snapshotId()); + +List tasks = plan(scan); + +assertThat(tasks).as("Must have 4 tasks").hasSize(4); + +DeletedRowsScanTask t1 = (DeletedRowsScanTask) tasks.get(0); +assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0); +assertThat(t1.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap2.snapshotId()); +assertThat(t1.file().path()).as("Data file must match").isEqualTo(FILE_B.path()); +assertThat(t1.addedDeletes().get(0).path()) +.as("Added delete file must match") +.isEqualTo(FILE_B_DELETES.path()); +assertThat(t1.existingDeletes()).as("Must be no existing deletes").isEmpty(); + +DeletedRowsScanTask t2 = (DeletedRowsScanTask) tasks.get(1); +assertThat(t2.changeOrdinal()).as("Ordinal must match").isEqualTo(1); +assertThat(t2.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap3.snapshotId()); +assertThat(t2.file().path()).as("Data file must match").isEqualTo(FILE_C.path()); +assertThat(t2.addedDeletes().get(0).path()) +.as("Added delete file must match") +.isEqualTo(FILE_C2_DELETES.path()); +assertThat(t2.existingDeletes()).as("Must be no existing deletes").isEmpty(); + +DeletedRowsScanTask t3 = (DeletedRowsScanTask) tasks.get(2); +assertThat(t3.changeOrdinal()).as("Ordinal must match").isEqualTo(2); +assertThat(t3.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap4.snapshotId()); +assertThat(t3.file().path()).as("Data file must match").isEqualTo(FILE_A2.path()); +assertThat(t3.addedDeletes().size()).as("Number of added delete files must match").isEqualTo(2); +assertThat(t3.addedDeletes().get(0).path()) +.as("Added delete file must match") +.isEqualTo(FILE_A2_DELETES.path()); +assertThat(t3.addedDeletes().get(1).path()) +.as("Added delete file must match") +.isEqualTo(FILE_A_DELETES.path()); +assertThat(t3.existingDeletes()).as("Must be no existing deletes").isEmpty(); + +DeletedRowsScanTask t4 = (DeletedRowsScanTask) tasks.get(3); +assertThat(t4.changeOrdinal()).as("Ordinal must match").isEqualTo(2); +assertThat(t4.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap4.snapshotId()); +assertThat(t4.file().path()).as("Data file must match").isEqualTo(FILE_A.path()); +assertThat(t4.addedDeletes().size()).as("Number of added delete files must match").isEqualTo(2); +assertThat(t4.addedDeletes().get(0).path()) +.as("Added delete file must match") +.isEqualTo(FILE_A2_DELETES.path()); +assertThat(t4.addedDeletes().get(1).path()) +.as("Added delete file must match") +.isEqualTo(FILE_A_DELETES.path()); +assertThat(t4.existingDeletes()).as("Must be no existing deletes").isEmpty(); Review Comment: I believe this is the core of the test I am looking for! I think there's some feature we can leverage in assertJ so I took the stub to organize this in a little different way. I do notice that the we use the file path instead of object itself for asserting equality on data and delete files, probably because once added to the table, its data and file sequence number are inherited from manifest entry. I think we can probably reuse this `FILE_COMPARISON_CONFIG` from another class [`TestManifestReader`](https://github.com/apache/iceberg/blob/5ea78e3fbe5d8c9c846a1b86bcd2b77d13a31acd/core/src/test/java/org/apache/iceberg/TestManifestReader.java#L40) ```java private static final RecursiveComparisonConfiguration FILE_COMPARISON_CONFIG = RecursiveComparisonConfiguration.builder() .withIgnoredFields( "dataSequenceNumber", "fileOrdinal", "fileSequenceNumber", "fromProj
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1764249470 ## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ## @@ -133,51 +128,124 @@ private static Map computeSnapshotOrdinals(Deque snapsh return snapshotOrdinals; } - private static class CreateDataFileChangeTasks implements CreateTasksFunction { -private static final DeleteFile[] NO_DELETES = new DeleteFile[0]; + private static class DummyChangelogScanTask implements ChangelogScanTask { +public static final DummyChangelogScanTask INSTANCE = new DummyChangelogScanTask(); -private final Map snapshotOrdinals; +private DummyChangelogScanTask() {} -CreateDataFileChangeTasks(Deque snapshots) { - this.snapshotOrdinals = computeSnapshotOrdinals(snapshots); +@Override +public ChangelogOperation operation() { + return ChangelogOperation.DELETE; +} + +@Override +public int changeOrdinal() { + return 0; +} + +@Override +public long commitSnapshotId() { + return 0L; +} + } + + private static class CreateDataFileChangeTasks implements CreateTasksFunction { +private final long snapshotId; +private final long sequenceNumber; +private final int changeOrdinal; + +CreateDataFileChangeTasks(long snapshotId, long sequenceNumber, int changeOrdinal) { + this.snapshotId = snapshotId; + this.sequenceNumber = sequenceNumber; + this.changeOrdinal = changeOrdinal; } @Override public CloseableIterable apply( CloseableIterable> entries, TaskContext context) { - return CloseableIterable.transform( - entries, - entry -> { -long commitSnapshotId = entry.snapshotId(); -int changeOrdinal = snapshotOrdinals.get(commitSnapshotId); -DataFile dataFile = entry.file().copy(context.shouldKeepStats()); - -switch (entry.status()) { - case ADDED: -return new BaseAddedRowsScanTask( -changeOrdinal, -commitSnapshotId, -dataFile, -NO_DELETES, -context.schemaAsString(), -context.specAsString(), -context.residuals()); - - case DELETED: -return new BaseDeletedDataFileScanTask( -changeOrdinal, -commitSnapshotId, -dataFile, -NO_DELETES, -context.schemaAsString(), -context.specAsString(), -context.residuals()); - - default: -throw new IllegalArgumentException("Unexpected entry status: " + entry.status()); -} - }); + CloseableIterable tasks = + CloseableIterable.transform( + entries, + entry -> { +long entrySnapshotId = entry.snapshotId(); +DataFile dataFile = entry.file().copy(context.shouldKeepStats()); +DeleteFile[] deleteFiles = context.deletes().forEntry(entry); +List added = Lists.newArrayList(); +List existing = Lists.newArrayList(); +for (DeleteFile deleteFile : deleteFiles) { + if (sequenceNumber == deleteFile.dataSequenceNumber()) { +added.add(deleteFile); + } else { +existing.add(deleteFile); + } +} +DeleteFile[] addedDeleteFiles = added.toArray(new DeleteFile[0]); +DeleteFile[] existingDeleteFiles = existing.toArray(new DeleteFile[0]); Review Comment: @pvary this was the bug I mentioned that I needed to fix before pushing a change with additional tests in `TestBaseIncrementalChangelogScan`. In `TestBaseIncrementalChangelogScan`, in `testDeletingDataFileWithExistingDeletes` and `testDeletingRowsInDataFileWithExistingDeletes`, if the `IncrementalChangelogScan` was from `snap1` to `snap3`, the bug was obscured; but with a scan from `snap2` to `snap3`, those tests revealed the bug. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1763998376 ## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ## @@ -133,51 +128,124 @@ private static Map computeSnapshotOrdinals(Deque snapsh return snapshotOrdinals; } - private static class CreateDataFileChangeTasks implements CreateTasksFunction { -private static final DeleteFile[] NO_DELETES = new DeleteFile[0]; + private static class DummyChangelogScanTask implements ChangelogScanTask { +public static final DummyChangelogScanTask INSTANCE = new DummyChangelogScanTask(); -private final Map snapshotOrdinals; +private DummyChangelogScanTask() {} -CreateDataFileChangeTasks(Deque snapshots) { - this.snapshotOrdinals = computeSnapshotOrdinals(snapshots); +@Override +public ChangelogOperation operation() { + return ChangelogOperation.DELETE; +} + +@Override +public int changeOrdinal() { + return 0; +} + +@Override +public long commitSnapshotId() { + return 0L; +} + } + + private static class CreateDataFileChangeTasks implements CreateTasksFunction { +private final long snapshotId; +private final long sequenceNumber; +private final int changeOrdinal; + +CreateDataFileChangeTasks(long snapshotId, long sequenceNumber, int changeOrdinal) { + this.snapshotId = snapshotId; + this.sequenceNumber = sequenceNumber; + this.changeOrdinal = changeOrdinal; } @Override public CloseableIterable apply( CloseableIterable> entries, TaskContext context) { - return CloseableIterable.transform( - entries, - entry -> { -long commitSnapshotId = entry.snapshotId(); -int changeOrdinal = snapshotOrdinals.get(commitSnapshotId); -DataFile dataFile = entry.file().copy(context.shouldKeepStats()); - -switch (entry.status()) { - case ADDED: -return new BaseAddedRowsScanTask( -changeOrdinal, -commitSnapshotId, -dataFile, -NO_DELETES, -context.schemaAsString(), -context.specAsString(), -context.residuals()); - - case DELETED: -return new BaseDeletedDataFileScanTask( -changeOrdinal, -commitSnapshotId, -dataFile, -NO_DELETES, -context.schemaAsString(), -context.specAsString(), -context.residuals()); - - default: -throw new IllegalArgumentException("Unexpected entry status: " + entry.status()); -} - }); + CloseableIterable tasks = + CloseableIterable.transform( + entries, + entry -> { +long entrySnapshotId = entry.snapshotId(); +DataFile dataFile = entry.file().copy(context.shouldKeepStats()); +DeleteFile[] deleteFiles = context.deletes().forEntry(entry); +List added = Lists.newArrayList(); +List existing = Lists.newArrayList(); +for (DeleteFile deleteFile : deleteFiles) { + if (sequenceNumber == deleteFile.dataSequenceNumber()) { +added.add(deleteFile); + } else { +existing.add(deleteFile); + } +} +DeleteFile[] addedDeleteFiles = added.toArray(new DeleteFile[0]); +DeleteFile[] existingDeleteFiles = existing.toArray(new DeleteFile[0]); Review Comment: Previously, I used a map from delete files to the snapshot where it is added, to determine the `addedDeleteFiles` and `existingDeleteFiles`. However, this map is computed for the snapshots in the range of interest (for the `BaseIncrementalChangelogScan`), and so there may not be an entry in the map for a delete file if it is added in a snapshot before this range. This causes issues in the `FluentIterable::filter(Predicate)` I was using to filter the delete files. This approach is simpler, and turns out to be what @manuzhang used his https://github.com/apache/iceberg/pull/9888. Hat tip to Manu. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1763979054 ## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ## @@ -133,51 +131,149 @@ private static Map computeSnapshotOrdinals(Deque snapsh return snapshotOrdinals; } - private static class CreateDataFileChangeTasks implements CreateTasksFunction { -private static final DeleteFile[] NO_DELETES = new DeleteFile[0]; + private Map computeDeleteFileToSnapshotOrdinal( + Deque snapshots, Map snapshotOrdinals) { +Map deleteFileToSnapshotOrdinal = Maps.newHashMap(); + +for (Snapshot snapshot : snapshots) { + Iterable deleteFiles = snapshot.addedDeleteFiles(table().io()); + for (DeleteFile deleteFile : deleteFiles) { +deleteFileToSnapshotOrdinal.put( +deleteFile.path().toString(), snapshotOrdinals.get(snapshot.snapshotId())); + } +} + +return deleteFileToSnapshotOrdinal; + } + + private static class DummyChangelogScanTask implements ChangelogScanTask { +public static final DummyChangelogScanTask INSTANCE = new DummyChangelogScanTask(); +private DummyChangelogScanTask() {} + +@Override +public ChangelogOperation operation() { + return ChangelogOperation.DELETE; +} + +@Override +public int changeOrdinal() { + return 0; +} + +@Override +public long commitSnapshotId() { + return 0L; +} + } + + private static class CreateDataFileChangeTasks implements CreateTasksFunction { +private final long snapshotId; +private final int changeOrdinal; private final Map snapshotOrdinals; +private final Map deleteFileToSnapshotOrdinal; + +CreateDataFileChangeTasks( +long snapshotId, +Map snapshotOrdinals, +Map deleteFileToSnapshotOrdinal) { + this.snapshotId = snapshotId; + this.snapshotOrdinals = snapshotOrdinals; + this.deleteFileToSnapshotOrdinal = deleteFileToSnapshotOrdinal; + this.changeOrdinal = this.snapshotOrdinals.get(snapshotId); +} + +private DeleteFile[] filterAdded(DeleteFile[] deleteFiles) { + return FluentIterable.from(deleteFiles) + .filter( + deleteFile -> + deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) == changeOrdinal) + .toArray(DeleteFile.class); +} -CreateDataFileChangeTasks(Deque snapshots) { - this.snapshotOrdinals = computeSnapshotOrdinals(snapshots); +private DeleteFile[] filterExisting(DeleteFile[] deleteFiles) { + return FluentIterable.from(deleteFiles) + .filter( + deleteFile -> + deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) < changeOrdinal) Review Comment: These methods have been removed in the latest version, so this is no longer relevant. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1763976533 ## core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java: ## @@ -132,6 +131,139 @@ public void testFileDeletes() { assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty(); } + @TestTemplate + public void testPositionDeletes() { Review Comment: Please look at the new consolidated test, `testRowDeletes`. I think that at the level at which we can test `BaseIncrementalChangelogScan`, it doesn't matter whether delete files are position delete files or equality delete files (they only matter when we actually apply the deletes, which doesn't happen at this level), so `testPositionDeletes` and `testEqualityDeletes` didn't really test different cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1763976533 ## core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java: ## @@ -132,6 +131,139 @@ public void testFileDeletes() { assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty(); } + @TestTemplate + public void testPositionDeletes() { Review Comment: Please look at the new consolidated test, `testRowDeletes`. I think that at the level at which we can test `BaseIncrementalChangelogScan`, it doesn't matter whether delete files are position delete files or equality delete files (they only matter when we actually apply the deletes, which doesn't happen at this level), so `testPositionDeletes` and `testEqualityDeletes` don't really test different cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
pvary commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1762795906 ## core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java: ## @@ -132,6 +131,139 @@ public void testFileDeletes() { assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty(); } + @TestTemplate + public void testPositionDeletes() { Review Comment: @wypoon: We usually try to create different tests for different test cases. So your original approach seems nice. Could we remove the code duplication with parameterized tests, or externalizing the duplicated code to test methods? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1762263665 ## core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java: ## @@ -132,6 +131,139 @@ public void testFileDeletes() { assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty(); } + @TestTemplate + public void testPositionDeletes() { Review Comment: Thank you for the review. I consolidated `testPositionDeletes`, `testEqualityDeletes` and `testMixOfPositionAndEqualityDeletes` into one test to reduce repetition as well as to test a scan with multiple snapshots. I also added tests for scan tasks with existing deletes. In the process, I discovered a bug in some edge cases and need to work on a fix before I push an update. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
pvary commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1759686766 ## core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java: ## @@ -132,6 +131,139 @@ public void testFileDeletes() { assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty(); } + @TestTemplate + public void testPositionDeletes() { Review Comment: nice stuff! Could we have 2 more test cases? 1. It would be nice to test the existing deletes scenario 2. It would be nice to test when the scan returns result for multiple snapshots Thanks, Peter -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
pvary commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1759685604 ## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ## @@ -63,33 +60,43 @@ protected CloseableIterable doPlanFiles( return CloseableIterable.empty(); } -Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots); +Map snapshotOrdinals = computeSnapshotOrdinals(changelogSnapshots); -Set newDataManifests = -FluentIterable.from(changelogSnapshots) -.transformAndConcat(snapshot -> snapshot.dataManifests(table().io())) -.filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId())) -.toSet(); - -ManifestGroup manifestGroup = -new ManifestGroup(table().io(), newDataManifests, ImmutableList.of()) -.specsById(table().specs()) -.caseSensitive(isCaseSensitive()) -.select(scanColumns()) -.filterData(filter()) -.filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId())) -.ignoreExisting() -.columnsToKeepStats(columnsToKeepStats()); - -if (shouldIgnoreResiduals()) { - manifestGroup = manifestGroup.ignoreResiduals(); -} - -if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) { - manifestGroup = manifestGroup.planWith(planExecutor()); -} +// map of delete file to the snapshot where the delete file is added +// the delete file is keyed by its path, and the snapshot is represented by the snapshot ordinal +Map deleteFileToSnapshotOrdinal = +computeDeleteFileToSnapshotOrdinal(changelogSnapshots, snapshotOrdinals); -return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots)); +Iterable> plans = +FluentIterable.from(changelogSnapshots) +.transform( +snapshot -> { + List dataManifests = snapshot.dataManifests(table().io()); + List deleteManifests = snapshot.deleteManifests(table().io()); + + ManifestGroup manifestGroup = + new ManifestGroup(table().io(), dataManifests, deleteManifests) + .specsById(table().specs()) + .caseSensitive(isCaseSensitive()) + .select(scanColumns()) + .filterData(filter()) + .columnsToKeepStats(columnsToKeepStats()); + + if (shouldIgnoreResiduals()) { +manifestGroup = manifestGroup.ignoreResiduals(); + } + + if (dataManifests.size() > 1 && shouldPlanWithExecutor()) { Review Comment: Thanks! I missed that we are iterating through the manifest files, and not the elements of the files. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1759573665 ## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ## @@ -63,33 +60,43 @@ protected CloseableIterable doPlanFiles( return CloseableIterable.empty(); } -Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots); +Map snapshotOrdinals = computeSnapshotOrdinals(changelogSnapshots); -Set newDataManifests = -FluentIterable.from(changelogSnapshots) -.transformAndConcat(snapshot -> snapshot.dataManifests(table().io())) -.filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId())) -.toSet(); - -ManifestGroup manifestGroup = -new ManifestGroup(table().io(), newDataManifests, ImmutableList.of()) -.specsById(table().specs()) -.caseSensitive(isCaseSensitive()) -.select(scanColumns()) -.filterData(filter()) -.filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId())) -.ignoreExisting() -.columnsToKeepStats(columnsToKeepStats()); - -if (shouldIgnoreResiduals()) { - manifestGroup = manifestGroup.ignoreResiduals(); -} - -if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) { - manifestGroup = manifestGroup.planWith(planExecutor()); -} +// map of delete file to the snapshot where the delete file is added +// the delete file is keyed by its path, and the snapshot is represented by the snapshot ordinal +Map deleteFileToSnapshotOrdinal = +computeDeleteFileToSnapshotOrdinal(changelogSnapshots, snapshotOrdinals); -return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots)); +Iterable> plans = +FluentIterable.from(changelogSnapshots) +.transform( +snapshot -> { + List dataManifests = snapshot.dataManifests(table().io()); + List deleteManifests = snapshot.deleteManifests(table().io()); + + ManifestGroup manifestGroup = + new ManifestGroup(table().io(), dataManifests, deleteManifests) + .specsById(table().specs()) + .caseSensitive(isCaseSensitive()) + .select(scanColumns()) + .filterData(filter()) + .columnsToKeepStats(columnsToKeepStats()); + + if (shouldIgnoreResiduals()) { +manifestGroup = manifestGroup.ignoreResiduals(); + } + + if (dataManifests.size() > 1 && shouldPlanWithExecutor()) { Review Comment: I see where you're coming from. Before this change, there is only one call to `ManifestGroup:plan`. The `ManifestGroup` contains manifests from all the snapshots in the range of interest, only manifest entries added in one of these snapshots are considered and manifests with only existing entries are ignored; then one call to `plan` generates the scan tasks. This approach only works for the copy-on-write case where there are no delete files; in the copy-on-write case, when deletes occur, new data files are always added and so will show up in the `ManifestGroup`. When we have delete files, a situation as you describe can easily arise, where there is no new data file added in a snapshot, only new delete files. That is why we need a different approach. With this change, there is a `ManifestGroup` created for each snapshot in the range of interest and a call to `ManifestGroup:plan` for it; then the `CloseableIterable` of scan tasks for all of these are concatenated into one `CloseableIterable`. We no longer ignore data manifests with existing entries; in fact, we have to consider them. In a snapshot with no added data files, just added delete files, we consider the existing data files and see if any deletes apply to them. Now, each call to `ManifestGroup:plan` is still only parallelizable depending on the number of data manifest files in that `ManifestGroup`. There is additional parallelism possible, but I'm not aware that we have the API for it, which is since we are making multiple `ManifestGroup:plan` calls (but on different `ManifestGroup` instances), to distribute each unit of work from every call to one executor service. That is not so easy to do with the API we have, I think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands,
Re: [PR] Support changelog scan for table with delete files [iceberg]
pvary commented on PR #10935: URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2348057437 > @pvary I added some tests in `TestBaseIncrementalChangelogScan`. However, at that level, we can only check what scan tasks (`AddedRowsScanTask`, `DeletedRowsScanTask`, `DeletedDataFileScanTask`) are generated. We cannot actually check the kind of results as we can in the Spark `TestChangelogReader`. Right now, the only consumer of `IncrementalChangelogScan` is Spark, so some of the functionality needs to be tested through Spark. Makes sense. Give me some time to review the added tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
pvary commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1758196928 ## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ## @@ -63,33 +60,43 @@ protected CloseableIterable doPlanFiles( return CloseableIterable.empty(); } -Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots); +Map snapshotOrdinals = computeSnapshotOrdinals(changelogSnapshots); -Set newDataManifests = -FluentIterable.from(changelogSnapshots) -.transformAndConcat(snapshot -> snapshot.dataManifests(table().io())) -.filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId())) -.toSet(); - -ManifestGroup manifestGroup = -new ManifestGroup(table().io(), newDataManifests, ImmutableList.of()) -.specsById(table().specs()) -.caseSensitive(isCaseSensitive()) -.select(scanColumns()) -.filterData(filter()) -.filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId())) -.ignoreExisting() -.columnsToKeepStats(columnsToKeepStats()); - -if (shouldIgnoreResiduals()) { - manifestGroup = manifestGroup.ignoreResiduals(); -} - -if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) { - manifestGroup = manifestGroup.planWith(planExecutor()); -} +// map of delete file to the snapshot where the delete file is added +// the delete file is keyed by its path, and the snapshot is represented by the snapshot ordinal +Map deleteFileToSnapshotOrdinal = +computeDeleteFileToSnapshotOrdinal(changelogSnapshots, snapshotOrdinals); -return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots)); +Iterable> plans = +FluentIterable.from(changelogSnapshots) +.transform( +snapshot -> { + List dataManifests = snapshot.dataManifests(table().io()); + List deleteManifests = snapshot.deleteManifests(table().io()); + + ManifestGroup manifestGroup = + new ManifestGroup(table().io(), dataManifests, deleteManifests) + .specsById(table().specs()) + .caseSensitive(isCaseSensitive()) + .select(scanColumns()) + .filterData(filter()) + .columnsToKeepStats(columnsToKeepStats()); + + if (shouldIgnoreResiduals()) { +manifestGroup = manifestGroup.ignoreResiduals(); + } + + if (dataManifests.size() > 1 && shouldPlanWithExecutor()) { Review Comment: I am concerned about a situation when there is no new data files in a snapshot. Let's say, we only delete a single row with either a positional or equality delete. How do we handle this situation? Or when we have multiple of these delete files, but no data files at all? Thanks for your commitment and efforts. Great to see this feature progressing! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on PR #10935: URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2348055779 @pvary I added some tests in `TestBaseIncrementalChangelogScan`. However, at that level, we can only check what scan tasks (`AddedRowsScanTask`, `DeletedRowsScanTask`, `DeletedDataFileScanTask`) are generated. We cannot actually check the kind of results as we can in the Spark `TestChangelogReader`. Right now, the only consumer of `IncrementalChangelogScan` is Spark, so some of the functionality needs to be tested through Spark. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
pvary commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1758196928 ## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ## @@ -63,33 +60,43 @@ protected CloseableIterable doPlanFiles( return CloseableIterable.empty(); } -Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots); +Map snapshotOrdinals = computeSnapshotOrdinals(changelogSnapshots); -Set newDataManifests = -FluentIterable.from(changelogSnapshots) -.transformAndConcat(snapshot -> snapshot.dataManifests(table().io())) -.filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId())) -.toSet(); - -ManifestGroup manifestGroup = -new ManifestGroup(table().io(), newDataManifests, ImmutableList.of()) -.specsById(table().specs()) -.caseSensitive(isCaseSensitive()) -.select(scanColumns()) -.filterData(filter()) -.filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId())) -.ignoreExisting() -.columnsToKeepStats(columnsToKeepStats()); - -if (shouldIgnoreResiduals()) { - manifestGroup = manifestGroup.ignoreResiduals(); -} - -if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) { - manifestGroup = manifestGroup.planWith(planExecutor()); -} +// map of delete file to the snapshot where the delete file is added +// the delete file is keyed by its path, and the snapshot is represented by the snapshot ordinal +Map deleteFileToSnapshotOrdinal = +computeDeleteFileToSnapshotOrdinal(changelogSnapshots, snapshotOrdinals); -return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots)); +Iterable> plans = +FluentIterable.from(changelogSnapshots) +.transform( +snapshot -> { + List dataManifests = snapshot.dataManifests(table().io()); + List deleteManifests = snapshot.deleteManifests(table().io()); + + ManifestGroup manifestGroup = + new ManifestGroup(table().io(), dataManifests, deleteManifests) + .specsById(table().specs()) + .caseSensitive(isCaseSensitive()) + .select(scanColumns()) + .filterData(filter()) + .columnsToKeepStats(columnsToKeepStats()); + + if (shouldIgnoreResiduals()) { +manifestGroup = manifestGroup.ignoreResiduals(); + } + + if (dataManifests.size() > 1 && shouldPlanWithExecutor()) { Review Comment: I am concerned about a situation when there is no new data files in a snapshot. Let's say, we only delete a single row with either a positional or equality delete. How do we handle this situation? Especially when we have multiple of these delete files, but no data files at all? Thanks for your commitment and efforts. Great to see this feature progressing! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
pvary commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1758193054 ## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ## @@ -133,51 +131,149 @@ private static Map computeSnapshotOrdinals(Deque snapsh return snapshotOrdinals; } - private static class CreateDataFileChangeTasks implements CreateTasksFunction { -private static final DeleteFile[] NO_DELETES = new DeleteFile[0]; + private Map computeDeleteFileToSnapshotOrdinal( + Deque snapshots, Map snapshotOrdinals) { +Map deleteFileToSnapshotOrdinal = Maps.newHashMap(); + +for (Snapshot snapshot : snapshots) { + Iterable deleteFiles = snapshot.addedDeleteFiles(table().io()); + for (DeleteFile deleteFile : deleteFiles) { +deleteFileToSnapshotOrdinal.put( +deleteFile.path().toString(), snapshotOrdinals.get(snapshot.snapshotId())); + } +} + +return deleteFileToSnapshotOrdinal; + } + + private static class DummyChangelogScanTask implements ChangelogScanTask { +public static final DummyChangelogScanTask INSTANCE = new DummyChangelogScanTask(); +private DummyChangelogScanTask() {} + +@Override +public ChangelogOperation operation() { + return ChangelogOperation.DELETE; +} + +@Override +public int changeOrdinal() { + return 0; +} + +@Override +public long commitSnapshotId() { + return 0L; +} + } + + private static class CreateDataFileChangeTasks implements CreateTasksFunction { +private final long snapshotId; +private final int changeOrdinal; private final Map snapshotOrdinals; +private final Map deleteFileToSnapshotOrdinal; + +CreateDataFileChangeTasks( +long snapshotId, +Map snapshotOrdinals, +Map deleteFileToSnapshotOrdinal) { + this.snapshotId = snapshotId; + this.snapshotOrdinals = snapshotOrdinals; + this.deleteFileToSnapshotOrdinal = deleteFileToSnapshotOrdinal; + this.changeOrdinal = this.snapshotOrdinals.get(snapshotId); +} + +private DeleteFile[] filterAdded(DeleteFile[] deleteFiles) { + return FluentIterable.from(deleteFiles) + .filter( + deleteFile -> + deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) == changeOrdinal) + .toArray(DeleteFile.class); +} -CreateDataFileChangeTasks(Deque snapshots) { - this.snapshotOrdinals = computeSnapshotOrdinals(snapshots); +private DeleteFile[] filterExisting(DeleteFile[] deleteFiles) { + return FluentIterable.from(deleteFiles) + .filter( + deleteFile -> + deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) < changeOrdinal) + .toArray(DeleteFile.class); } @Override public CloseableIterable apply( CloseableIterable> entries, TaskContext context) { - return CloseableIterable.transform( - entries, - entry -> { -long commitSnapshotId = entry.snapshotId(); -int changeOrdinal = snapshotOrdinals.get(commitSnapshotId); -DataFile dataFile = entry.file().copy(context.shouldKeepStats()); - -switch (entry.status()) { - case ADDED: -return new BaseAddedRowsScanTask( -changeOrdinal, -commitSnapshotId, -dataFile, -NO_DELETES, -context.schemaAsString(), -context.specAsString(), -context.residuals()); - - case DELETED: -return new BaseDeletedDataFileScanTask( -changeOrdinal, -commitSnapshotId, -dataFile, -NO_DELETES, -context.schemaAsString(), -context.specAsString(), -context.residuals()); - - default: -throw new IllegalArgumentException("Unexpected entry status: " + entry.status()); -} - }); + CloseableIterable tasks = + CloseableIterable.transform( + entries, + entry -> { +long entrySnapshotId = entry.snapshotId(); +DataFile dataFile = entry.file().copy(context.shouldKeepStats()); +DeleteFile[] addedDeleteFiles = filterAdded(context.deletes().forEntry(entry)); + +switch (entry.status()) { + case ADDED: +if (entrySnapshotId == snapshotId) { + return new BaseAddedRowsScanTask( + changeOrdinal, + snapshotId, + dataFile, + addedDeleteFiles, + context.schemaAsString(), + context.specAsString(), +
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1758191734 ## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ## @@ -63,33 +60,43 @@ protected CloseableIterable doPlanFiles( return CloseableIterable.empty(); } -Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots); +Map snapshotOrdinals = computeSnapshotOrdinals(changelogSnapshots); -Set newDataManifests = -FluentIterable.from(changelogSnapshots) -.transformAndConcat(snapshot -> snapshot.dataManifests(table().io())) -.filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId())) -.toSet(); - -ManifestGroup manifestGroup = -new ManifestGroup(table().io(), newDataManifests, ImmutableList.of()) -.specsById(table().specs()) -.caseSensitive(isCaseSensitive()) -.select(scanColumns()) -.filterData(filter()) -.filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId())) -.ignoreExisting() -.columnsToKeepStats(columnsToKeepStats()); - -if (shouldIgnoreResiduals()) { - manifestGroup = manifestGroup.ignoreResiduals(); -} - -if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) { - manifestGroup = manifestGroup.planWith(planExecutor()); -} +// map of delete file to the snapshot where the delete file is added +// the delete file is keyed by its path, and the snapshot is represented by the snapshot ordinal +Map deleteFileToSnapshotOrdinal = +computeDeleteFileToSnapshotOrdinal(changelogSnapshots, snapshotOrdinals); -return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots)); +Iterable> plans = +FluentIterable.from(changelogSnapshots) +.transform( +snapshot -> { + List dataManifests = snapshot.dataManifests(table().io()); + List deleteManifests = snapshot.deleteManifests(table().io()); + + ManifestGroup manifestGroup = + new ManifestGroup(table().io(), dataManifests, deleteManifests) + .specsById(table().specs()) + .caseSensitive(isCaseSensitive()) + .select(scanColumns()) + .filterData(filter()) + .columnsToKeepStats(columnsToKeepStats()); + + if (shouldIgnoreResiduals()) { +manifestGroup = manifestGroup.ignoreResiduals(); + } + + if (dataManifests.size() > 1 && shouldPlanWithExecutor()) { Review Comment: As I wrote, a unit of work is processing a data manifest file. If we have multiple data manifest files (`dataManifests.size() > 1` is true), they can be parallelized. For each unit of work, we produce scan tasks for the manifest entries in the manifest. Each manifest entry (a data file) can have some number of delete files (whether position or equality deletes) associated with it, but that number doesn't affect the parallelizability of the work. I'm really not sure I understand your question. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
pvary commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1758186873 ## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ## @@ -133,51 +131,149 @@ private static Map computeSnapshotOrdinals(Deque snapsh return snapshotOrdinals; } - private static class CreateDataFileChangeTasks implements CreateTasksFunction { -private static final DeleteFile[] NO_DELETES = new DeleteFile[0]; + private Map computeDeleteFileToSnapshotOrdinal( + Deque snapshots, Map snapshotOrdinals) { +Map deleteFileToSnapshotOrdinal = Maps.newHashMap(); + +for (Snapshot snapshot : snapshots) { + Iterable deleteFiles = snapshot.addedDeleteFiles(table().io()); + for (DeleteFile deleteFile : deleteFiles) { +deleteFileToSnapshotOrdinal.put( +deleteFile.path().toString(), snapshotOrdinals.get(snapshot.snapshotId())); + } +} + +return deleteFileToSnapshotOrdinal; + } + + private static class DummyChangelogScanTask implements ChangelogScanTask { +public static final DummyChangelogScanTask INSTANCE = new DummyChangelogScanTask(); +private DummyChangelogScanTask() {} + +@Override +public ChangelogOperation operation() { + return ChangelogOperation.DELETE; +} + +@Override +public int changeOrdinal() { + return 0; +} + +@Override +public long commitSnapshotId() { + return 0L; +} + } + + private static class CreateDataFileChangeTasks implements CreateTasksFunction { +private final long snapshotId; +private final int changeOrdinal; private final Map snapshotOrdinals; +private final Map deleteFileToSnapshotOrdinal; + +CreateDataFileChangeTasks( +long snapshotId, +Map snapshotOrdinals, +Map deleteFileToSnapshotOrdinal) { + this.snapshotId = snapshotId; + this.snapshotOrdinals = snapshotOrdinals; + this.deleteFileToSnapshotOrdinal = deleteFileToSnapshotOrdinal; + this.changeOrdinal = this.snapshotOrdinals.get(snapshotId); +} + +private DeleteFile[] filterAdded(DeleteFile[] deleteFiles) { + return FluentIterable.from(deleteFiles) + .filter( + deleteFile -> + deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) == changeOrdinal) + .toArray(DeleteFile.class); +} -CreateDataFileChangeTasks(Deque snapshots) { - this.snapshotOrdinals = computeSnapshotOrdinals(snapshots); +private DeleteFile[] filterExisting(DeleteFile[] deleteFiles) { + return FluentIterable.from(deleteFiles) + .filter( + deleteFile -> + deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) < changeOrdinal) + .toArray(DeleteFile.class); } @Override public CloseableIterable apply( CloseableIterable> entries, TaskContext context) { - return CloseableIterable.transform( - entries, - entry -> { -long commitSnapshotId = entry.snapshotId(); -int changeOrdinal = snapshotOrdinals.get(commitSnapshotId); -DataFile dataFile = entry.file().copy(context.shouldKeepStats()); - -switch (entry.status()) { - case ADDED: -return new BaseAddedRowsScanTask( -changeOrdinal, -commitSnapshotId, -dataFile, -NO_DELETES, -context.schemaAsString(), -context.specAsString(), -context.residuals()); - - case DELETED: -return new BaseDeletedDataFileScanTask( -changeOrdinal, -commitSnapshotId, -dataFile, -NO_DELETES, -context.schemaAsString(), -context.specAsString(), -context.residuals()); - - default: -throw new IllegalArgumentException("Unexpected entry status: " + entry.status()); -} - }); + CloseableIterable tasks = + CloseableIterable.transform( + entries, + entry -> { +long entrySnapshotId = entry.snapshotId(); +DataFile dataFile = entry.file().copy(context.shouldKeepStats()); +DeleteFile[] addedDeleteFiles = filterAdded(context.deletes().forEntry(entry)); + +switch (entry.status()) { + case ADDED: +if (entrySnapshotId == snapshotId) { + return new BaseAddedRowsScanTask( + changeOrdinal, + snapshotId, + dataFile, + addedDeleteFiles, + context.schemaAsString(), + context.specAsString(), +
Re: [PR] Support changelog scan for table with delete files [iceberg]
pvary commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1758181104 ## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ## @@ -63,33 +60,43 @@ protected CloseableIterable doPlanFiles( return CloseableIterable.empty(); } -Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots); +Map snapshotOrdinals = computeSnapshotOrdinals(changelogSnapshots); -Set newDataManifests = -FluentIterable.from(changelogSnapshots) -.transformAndConcat(snapshot -> snapshot.dataManifests(table().io())) -.filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId())) -.toSet(); - -ManifestGroup manifestGroup = -new ManifestGroup(table().io(), newDataManifests, ImmutableList.of()) -.specsById(table().specs()) -.caseSensitive(isCaseSensitive()) -.select(scanColumns()) -.filterData(filter()) -.filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId())) -.ignoreExisting() -.columnsToKeepStats(columnsToKeepStats()); - -if (shouldIgnoreResiduals()) { - manifestGroup = manifestGroup.ignoreResiduals(); -} - -if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) { - manifestGroup = manifestGroup.planWith(planExecutor()); -} +// map of delete file to the snapshot where the delete file is added +// the delete file is keyed by its path, and the snapshot is represented by the snapshot ordinal +Map deleteFileToSnapshotOrdinal = +computeDeleteFileToSnapshotOrdinal(changelogSnapshots, snapshotOrdinals); -return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots)); +Iterable> plans = +FluentIterable.from(changelogSnapshots) +.transform( +snapshot -> { + List dataManifests = snapshot.dataManifests(table().io()); + List deleteManifests = snapshot.deleteManifests(table().io()); + + ManifestGroup manifestGroup = + new ManifestGroup(table().io(), dataManifests, deleteManifests) + .specsById(table().specs()) + .caseSensitive(isCaseSensitive()) + .select(scanColumns()) + .filterData(filter()) + .columnsToKeepStats(columnsToKeepStats()); + + if (shouldIgnoreResiduals()) { +manifestGroup = manifestGroup.ignoreResiduals(); + } + + if (dataManifests.size() > 1 && shouldPlanWithExecutor()) { Review Comment: How does the planning handle when the snapshot only has a few equality delete files? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1757711124 ## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ## @@ -133,51 +131,149 @@ private static Map computeSnapshotOrdinals(Deque snapsh return snapshotOrdinals; } - private static class CreateDataFileChangeTasks implements CreateTasksFunction { -private static final DeleteFile[] NO_DELETES = new DeleteFile[0]; + private Map computeDeleteFileToSnapshotOrdinal( + Deque snapshots, Map snapshotOrdinals) { +Map deleteFileToSnapshotOrdinal = Maps.newHashMap(); + +for (Snapshot snapshot : snapshots) { + Iterable deleteFiles = snapshot.addedDeleteFiles(table().io()); + for (DeleteFile deleteFile : deleteFiles) { +deleteFileToSnapshotOrdinal.put( +deleteFile.path().toString(), snapshotOrdinals.get(snapshot.snapshotId())); + } +} + +return deleteFileToSnapshotOrdinal; + } + + private static class DummyChangelogScanTask implements ChangelogScanTask { +public static final DummyChangelogScanTask INSTANCE = new DummyChangelogScanTask(); +private DummyChangelogScanTask() {} + +@Override +public ChangelogOperation operation() { + return ChangelogOperation.DELETE; +} + +@Override +public int changeOrdinal() { + return 0; +} + +@Override +public long commitSnapshotId() { + return 0L; +} + } + + private static class CreateDataFileChangeTasks implements CreateTasksFunction { +private final long snapshotId; +private final int changeOrdinal; private final Map snapshotOrdinals; +private final Map deleteFileToSnapshotOrdinal; + +CreateDataFileChangeTasks( +long snapshotId, +Map snapshotOrdinals, +Map deleteFileToSnapshotOrdinal) { + this.snapshotId = snapshotId; + this.snapshotOrdinals = snapshotOrdinals; + this.deleteFileToSnapshotOrdinal = deleteFileToSnapshotOrdinal; + this.changeOrdinal = this.snapshotOrdinals.get(snapshotId); +} + +private DeleteFile[] filterAdded(DeleteFile[] deleteFiles) { + return FluentIterable.from(deleteFiles) + .filter( + deleteFile -> + deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) == changeOrdinal) + .toArray(DeleteFile.class); +} -CreateDataFileChangeTasks(Deque snapshots) { - this.snapshotOrdinals = computeSnapshotOrdinals(snapshots); +private DeleteFile[] filterExisting(DeleteFile[] deleteFiles) { + return FluentIterable.from(deleteFiles) + .filter( + deleteFile -> + deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) < changeOrdinal) + .toArray(DeleteFile.class); } @Override public CloseableIterable apply( CloseableIterable> entries, TaskContext context) { - return CloseableIterable.transform( - entries, - entry -> { -long commitSnapshotId = entry.snapshotId(); -int changeOrdinal = snapshotOrdinals.get(commitSnapshotId); -DataFile dataFile = entry.file().copy(context.shouldKeepStats()); - -switch (entry.status()) { - case ADDED: -return new BaseAddedRowsScanTask( -changeOrdinal, -commitSnapshotId, -dataFile, -NO_DELETES, -context.schemaAsString(), -context.specAsString(), -context.residuals()); - - case DELETED: -return new BaseDeletedDataFileScanTask( -changeOrdinal, -commitSnapshotId, -dataFile, -NO_DELETES, -context.schemaAsString(), -context.specAsString(), -context.residuals()); - - default: -throw new IllegalArgumentException("Unexpected entry status: " + entry.status()); -} - }); + CloseableIterable tasks = + CloseableIterable.transform( + entries, + entry -> { +long entrySnapshotId = entry.snapshotId(); +DataFile dataFile = entry.file().copy(context.shouldKeepStats()); +DeleteFile[] addedDeleteFiles = filterAdded(context.deletes().forEntry(entry)); + +switch (entry.status()) { + case ADDED: +if (entrySnapshotId == snapshotId) { + return new BaseAddedRowsScanTask( + changeOrdinal, + snapshotId, + dataFile, + addedDeleteFiles, + context.schemaAsString(), + context.specAsString(), +
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1757711124 ## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ## @@ -133,51 +131,149 @@ private static Map computeSnapshotOrdinals(Deque snapsh return snapshotOrdinals; } - private static class CreateDataFileChangeTasks implements CreateTasksFunction { -private static final DeleteFile[] NO_DELETES = new DeleteFile[0]; + private Map computeDeleteFileToSnapshotOrdinal( + Deque snapshots, Map snapshotOrdinals) { +Map deleteFileToSnapshotOrdinal = Maps.newHashMap(); + +for (Snapshot snapshot : snapshots) { + Iterable deleteFiles = snapshot.addedDeleteFiles(table().io()); + for (DeleteFile deleteFile : deleteFiles) { +deleteFileToSnapshotOrdinal.put( +deleteFile.path().toString(), snapshotOrdinals.get(snapshot.snapshotId())); + } +} + +return deleteFileToSnapshotOrdinal; + } + + private static class DummyChangelogScanTask implements ChangelogScanTask { +public static final DummyChangelogScanTask INSTANCE = new DummyChangelogScanTask(); +private DummyChangelogScanTask() {} + +@Override +public ChangelogOperation operation() { + return ChangelogOperation.DELETE; +} + +@Override +public int changeOrdinal() { + return 0; +} + +@Override +public long commitSnapshotId() { + return 0L; +} + } + + private static class CreateDataFileChangeTasks implements CreateTasksFunction { +private final long snapshotId; +private final int changeOrdinal; private final Map snapshotOrdinals; +private final Map deleteFileToSnapshotOrdinal; + +CreateDataFileChangeTasks( +long snapshotId, +Map snapshotOrdinals, +Map deleteFileToSnapshotOrdinal) { + this.snapshotId = snapshotId; + this.snapshotOrdinals = snapshotOrdinals; + this.deleteFileToSnapshotOrdinal = deleteFileToSnapshotOrdinal; + this.changeOrdinal = this.snapshotOrdinals.get(snapshotId); +} + +private DeleteFile[] filterAdded(DeleteFile[] deleteFiles) { + return FluentIterable.from(deleteFiles) + .filter( + deleteFile -> + deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) == changeOrdinal) + .toArray(DeleteFile.class); +} -CreateDataFileChangeTasks(Deque snapshots) { - this.snapshotOrdinals = computeSnapshotOrdinals(snapshots); +private DeleteFile[] filterExisting(DeleteFile[] deleteFiles) { + return FluentIterable.from(deleteFiles) + .filter( + deleteFile -> + deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) < changeOrdinal) + .toArray(DeleteFile.class); } @Override public CloseableIterable apply( CloseableIterable> entries, TaskContext context) { - return CloseableIterable.transform( - entries, - entry -> { -long commitSnapshotId = entry.snapshotId(); -int changeOrdinal = snapshotOrdinals.get(commitSnapshotId); -DataFile dataFile = entry.file().copy(context.shouldKeepStats()); - -switch (entry.status()) { - case ADDED: -return new BaseAddedRowsScanTask( -changeOrdinal, -commitSnapshotId, -dataFile, -NO_DELETES, -context.schemaAsString(), -context.specAsString(), -context.residuals()); - - case DELETED: -return new BaseDeletedDataFileScanTask( -changeOrdinal, -commitSnapshotId, -dataFile, -NO_DELETES, -context.schemaAsString(), -context.specAsString(), -context.residuals()); - - default: -throw new IllegalArgumentException("Unexpected entry status: " + entry.status()); -} - }); + CloseableIterable tasks = + CloseableIterable.transform( + entries, + entry -> { +long entrySnapshotId = entry.snapshotId(); +DataFile dataFile = entry.file().copy(context.shouldKeepStats()); +DeleteFile[] addedDeleteFiles = filterAdded(context.deletes().forEntry(entry)); + +switch (entry.status()) { + case ADDED: +if (entrySnapshotId == snapshotId) { + return new BaseAddedRowsScanTask( + changeOrdinal, + snapshotId, + dataFile, + addedDeleteFiles, + context.schemaAsString(), + context.specAsString(), +
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1757707990 ## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ## @@ -133,51 +131,149 @@ private static Map computeSnapshotOrdinals(Deque snapsh return snapshotOrdinals; } - private static class CreateDataFileChangeTasks implements CreateTasksFunction { -private static final DeleteFile[] NO_DELETES = new DeleteFile[0]; + private Map computeDeleteFileToSnapshotOrdinal( + Deque snapshots, Map snapshotOrdinals) { +Map deleteFileToSnapshotOrdinal = Maps.newHashMap(); + +for (Snapshot snapshot : snapshots) { + Iterable deleteFiles = snapshot.addedDeleteFiles(table().io()); + for (DeleteFile deleteFile : deleteFiles) { +deleteFileToSnapshotOrdinal.put( +deleteFile.path().toString(), snapshotOrdinals.get(snapshot.snapshotId())); + } +} + +return deleteFileToSnapshotOrdinal; + } + + private static class DummyChangelogScanTask implements ChangelogScanTask { +public static final DummyChangelogScanTask INSTANCE = new DummyChangelogScanTask(); +private DummyChangelogScanTask() {} + +@Override +public ChangelogOperation operation() { + return ChangelogOperation.DELETE; +} + +@Override +public int changeOrdinal() { + return 0; +} + +@Override +public long commitSnapshotId() { + return 0L; +} + } + + private static class CreateDataFileChangeTasks implements CreateTasksFunction { +private final long snapshotId; +private final int changeOrdinal; private final Map snapshotOrdinals; +private final Map deleteFileToSnapshotOrdinal; + +CreateDataFileChangeTasks( +long snapshotId, +Map snapshotOrdinals, +Map deleteFileToSnapshotOrdinal) { + this.snapshotId = snapshotId; + this.snapshotOrdinals = snapshotOrdinals; + this.deleteFileToSnapshotOrdinal = deleteFileToSnapshotOrdinal; + this.changeOrdinal = this.snapshotOrdinals.get(snapshotId); +} + +private DeleteFile[] filterAdded(DeleteFile[] deleteFiles) { + return FluentIterable.from(deleteFiles) + .filter( + deleteFile -> + deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) == changeOrdinal) + .toArray(DeleteFile.class); +} -CreateDataFileChangeTasks(Deque snapshots) { - this.snapshotOrdinals = computeSnapshotOrdinals(snapshots); +private DeleteFile[] filterExisting(DeleteFile[] deleteFiles) { + return FluentIterable.from(deleteFiles) + .filter( + deleteFile -> + deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) < changeOrdinal) + .toArray(DeleteFile.class); } @Override public CloseableIterable apply( CloseableIterable> entries, TaskContext context) { - return CloseableIterable.transform( - entries, - entry -> { -long commitSnapshotId = entry.snapshotId(); -int changeOrdinal = snapshotOrdinals.get(commitSnapshotId); -DataFile dataFile = entry.file().copy(context.shouldKeepStats()); - -switch (entry.status()) { - case ADDED: -return new BaseAddedRowsScanTask( -changeOrdinal, -commitSnapshotId, -dataFile, -NO_DELETES, -context.schemaAsString(), -context.specAsString(), -context.residuals()); - - case DELETED: -return new BaseDeletedDataFileScanTask( -changeOrdinal, -commitSnapshotId, -dataFile, -NO_DELETES, -context.schemaAsString(), -context.specAsString(), -context.residuals()); - - default: -throw new IllegalArgumentException("Unexpected entry status: " + entry.status()); -} - }); + CloseableIterable tasks = + CloseableIterable.transform( + entries, + entry -> { +long entrySnapshotId = entry.snapshotId(); +DataFile dataFile = entry.file().copy(context.shouldKeepStats()); +DeleteFile[] addedDeleteFiles = filterAdded(context.deletes().forEntry(entry)); Review Comment: You are right. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubsc
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1757706938 ## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ## @@ -133,51 +131,149 @@ private static Map computeSnapshotOrdinals(Deque snapsh return snapshotOrdinals; } - private static class CreateDataFileChangeTasks implements CreateTasksFunction { -private static final DeleteFile[] NO_DELETES = new DeleteFile[0]; + private Map computeDeleteFileToSnapshotOrdinal( + Deque snapshots, Map snapshotOrdinals) { +Map deleteFileToSnapshotOrdinal = Maps.newHashMap(); + +for (Snapshot snapshot : snapshots) { + Iterable deleteFiles = snapshot.addedDeleteFiles(table().io()); + for (DeleteFile deleteFile : deleteFiles) { +deleteFileToSnapshotOrdinal.put( +deleteFile.path().toString(), snapshotOrdinals.get(snapshot.snapshotId())); + } +} + +return deleteFileToSnapshotOrdinal; + } + + private static class DummyChangelogScanTask implements ChangelogScanTask { +public static final DummyChangelogScanTask INSTANCE = new DummyChangelogScanTask(); +private DummyChangelogScanTask() {} + +@Override +public ChangelogOperation operation() { + return ChangelogOperation.DELETE; +} + +@Override +public int changeOrdinal() { + return 0; +} + +@Override +public long commitSnapshotId() { + return 0L; +} + } + + private static class CreateDataFileChangeTasks implements CreateTasksFunction { +private final long snapshotId; +private final int changeOrdinal; private final Map snapshotOrdinals; +private final Map deleteFileToSnapshotOrdinal; + +CreateDataFileChangeTasks( +long snapshotId, +Map snapshotOrdinals, +Map deleteFileToSnapshotOrdinal) { + this.snapshotId = snapshotId; + this.snapshotOrdinals = snapshotOrdinals; + this.deleteFileToSnapshotOrdinal = deleteFileToSnapshotOrdinal; + this.changeOrdinal = this.snapshotOrdinals.get(snapshotId); +} + +private DeleteFile[] filterAdded(DeleteFile[] deleteFiles) { + return FluentIterable.from(deleteFiles) + .filter( + deleteFile -> + deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) == changeOrdinal) + .toArray(DeleteFile.class); +} -CreateDataFileChangeTasks(Deque snapshots) { - this.snapshotOrdinals = computeSnapshotOrdinals(snapshots); +private DeleteFile[] filterExisting(DeleteFile[] deleteFiles) { + return FluentIterable.from(deleteFiles) + .filter( + deleteFile -> + deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) < changeOrdinal) Review Comment: I'm not sure I understand the suggestion. Perhaps you can elaborate? I thought the code here is quite clear and the intention is apparent from the method name. Thus `filterAdded` filters for delete files whose snapshot ordinal is the same as (`==`) `changeOrdinal`, while `filterExisting` filters for delete files whose snapshot ordinal comes before (`<`) `changeOrdinal`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1757701740 ## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java: ## @@ -112,13 +149,62 @@ private CloseableIterable openChangelogScanTask(ChangelogScanTask t CloseableIterable openAddedRowsScanTask(AddedRowsScanTask task) { String filePath = task.file().path().toString(); SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.deletes(), counter()); -return deletes.filter(rows(task, deletes.requiredSchema())); +int[] indexes = indexesInRow(deletes.requiredSchema()); + +return CloseableIterable.transform( +deletes.filter(rows(task, deletes.requiredSchema())), +row -> { + InternalRow expectedRow = new GenericInternalRow(columns.length); + + for (int i = 0; i < columns.length; i++) { +expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i])); + } + + return expectedRow; +}); Review Comment: I'm adding a method `projectRow`, so that the transform can be written as `row -> projectRow(indexes)`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1757688389 ## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java: ## @@ -112,13 +149,62 @@ private CloseableIterable openChangelogScanTask(ChangelogScanTask t CloseableIterable openAddedRowsScanTask(AddedRowsScanTask task) { String filePath = task.file().path().toString(); SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.deletes(), counter()); -return deletes.filter(rows(task, deletes.requiredSchema())); +int[] indexes = indexesInRow(deletes.requiredSchema()); + +return CloseableIterable.transform( +deletes.filter(rows(task, deletes.requiredSchema())), +row -> { + InternalRow expectedRow = new GenericInternalRow(columns.length); + + for (int i = 0; i < columns.length; i++) { +expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i])); + } + + return expectedRow; +}); Review Comment: The code looks the same, but it is not the same transform as it is a closure, and `indexes` in each closure is different, as `indexes` is a function of the `SparkDeleteFilter` in each case and that is different. I didn't try too hard to avoid the code repetition here, as I didn't feel it was worth the effort, but let me think a bit more. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1757688389 ## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java: ## @@ -112,13 +149,62 @@ private CloseableIterable openChangelogScanTask(ChangelogScanTask t CloseableIterable openAddedRowsScanTask(AddedRowsScanTask task) { String filePath = task.file().path().toString(); SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.deletes(), counter()); -return deletes.filter(rows(task, deletes.requiredSchema())); +int[] indexes = indexesInRow(deletes.requiredSchema()); + +return CloseableIterable.transform( +deletes.filter(rows(task, deletes.requiredSchema())), +row -> { + InternalRow expectedRow = new GenericInternalRow(columns.length); + + for (int i = 0; i < columns.length; i++) { +expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i])); + } + + return expectedRow; +}); Review Comment: The code looks the same, but it is not the same transform as it is a closure, and `indexes` in the each closure is different, as `indexes` is a function of the `SparkDeleteFilter` in each case and that is different. I didn't try too hard to avoid the code repetition here, as I didn't feel it was worth the effort, but let me think a bit more. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1757688389 ## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java: ## @@ -112,13 +149,62 @@ private CloseableIterable openChangelogScanTask(ChangelogScanTask t CloseableIterable openAddedRowsScanTask(AddedRowsScanTask task) { String filePath = task.file().path().toString(); SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.deletes(), counter()); -return deletes.filter(rows(task, deletes.requiredSchema())); +int[] indexes = indexesInRow(deletes.requiredSchema()); + +return CloseableIterable.transform( +deletes.filter(rows(task, deletes.requiredSchema())), +row -> { + InternalRow expectedRow = new GenericInternalRow(columns.length); + + for (int i = 0; i < columns.length; i++) { +expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i])); + } + + return expectedRow; +}); Review Comment: The code looks the same, but it is not the same transform as it is a closure, and `indexes` in the each closure is different, as `indexes` is a function of the `SparkDeleteFilter` in each case and that is different. I didn't try too hard to avoid the code repetition here, as I didn't feel it was worth the effort. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1757679042 ## spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java: ## @@ -191,39 +214,359 @@ public void testMixDeleteAndInsert() throws IOException { table.newAppend().appendFile(dataFile2).commit(); long snapshotId3 = table.currentSnapshot().snapshotId(); -CloseableIterable> taskGroups = newScan().planTasks(); +List rows = getChangelogRows(table); + +List expectedRows = Lists.newArrayList(); +addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, records1); +addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId2, 1, records1); +addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId3, 2, records2); + +assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows)); + } + + @Test + public void testPositionDeletes() throws IOException { +table.newAppend().appendFile(dataFile1).commit(); +long snapshotId1 = table.currentSnapshot().snapshotId(); + +table.newAppend().appendFile(dataFile2).commit(); +long snapshotId2 = table.currentSnapshot().snapshotId(); + +List> deletes = +Lists.newArrayList( +Pair.of(dataFile1.path(), 0L), // id = 29 +Pair.of(dataFile1.path(), 3L), // id = 89 +Pair.of(dataFile2.path(), 2L) // id = 122 +); + +Pair posDeletes = +FileHelpers.writeDeleteFile( +table, +Files.localOutput(File.createTempFile("junit", null, temp.toFile())), +TestHelpers.Row.of(0), +deletes); + +table +.newRowDelta() +.addDeletes(posDeletes.first()) +.validateDataFilesExist(posDeletes.second()) +.commit(); +long snapshotId3 = table.currentSnapshot().snapshotId(); + +List rows = getChangelogRows(table); + +List expectedRows = Lists.newArrayList(); +addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, records1); +addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, records2); +addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId3, 2, records3); + +assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows)); + } + + @Test + public void testEqualityDeletes() throws IOException { +table.newAppend().appendFile(dataFile1).commit(); +long snapshotId1 = table.currentSnapshot().snapshotId(); + +table.newAppend().appendFile(dataFile2).commit(); +long snapshotId2 = table.currentSnapshot().snapshotId(); + +Schema deleteRowSchema = table.schema().select("data"); +Record dataDelete = GenericRecord.create(deleteRowSchema); +List dataDeletes = +Lists.newArrayList( +dataDelete.copy("data", "a"), // id = 29 +dataDelete.copy("data", "d"), // id = 89 +dataDelete.copy("data", "g") // id = 122 +); + +DeleteFile eqDeletes = +FileHelpers.writeDeleteFile( +table, +Files.localOutput(File.createTempFile("junit", null, temp.toFile())), +TestHelpers.Row.of(0), +dataDeletes, +deleteRowSchema); + +table.newRowDelta().addDeletes(eqDeletes).commit(); +long snapshotId3 = table.currentSnapshot().snapshotId(); + +List rows = getChangelogRows(table); + +List expectedRows = Lists.newArrayList(); +addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, records1); +addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, records2); +addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId3, 2, records3); + +assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows)); + } + + @Test + public void testMixOfPositionAndEqualityDeletes() throws IOException { +table.newAppend().appendFile(dataFile1).commit(); +long snapshotId1 = table.currentSnapshot().snapshotId(); + +table.newAppend().appendFile(dataFile2).commit(); +long snapshotId2 = table.currentSnapshot().snapshotId(); + +List> deletes = +Lists.newArrayList( +Pair.of(dataFile1.path(), 0L), // id = 29 +Pair.of(dataFile1.path(), 3L) // id = 89 +); + +Pair posDeletes = +FileHelpers.writeDeleteFile( +table, +Files.localOutput(File.createTempFile("junit", null, temp.toFile())), +TestHelpers.Row.of(0), +deletes); + +Schema deleteRowSchema = table.schema().select("data"); +Record dataDelete = GenericRecord.create(deleteRowSchema); +List dataDeletes = +Lists.newArrayList( +dataDelete.copy("data", "a"), // id = 29 +dataDelete.copy("data", "g") // id = 122 +); + +DeleteFile eqDeletes = +FileHelpers.writeDeleteFile( +
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1757391061 ## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ## @@ -63,33 +60,43 @@ protected CloseableIterable doPlanFiles( return CloseableIterable.empty(); } -Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots); +Map snapshotOrdinals = computeSnapshotOrdinals(changelogSnapshots); -Set newDataManifests = -FluentIterable.from(changelogSnapshots) -.transformAndConcat(snapshot -> snapshot.dataManifests(table().io())) -.filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId())) -.toSet(); - -ManifestGroup manifestGroup = -new ManifestGroup(table().io(), newDataManifests, ImmutableList.of()) -.specsById(table().specs()) -.caseSensitive(isCaseSensitive()) -.select(scanColumns()) -.filterData(filter()) -.filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId())) -.ignoreExisting() -.columnsToKeepStats(columnsToKeepStats()); - -if (shouldIgnoreResiduals()) { - manifestGroup = manifestGroup.ignoreResiduals(); -} - -if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) { - manifestGroup = manifestGroup.planWith(planExecutor()); -} +// map of delete file to the snapshot where the delete file is added +// the delete file is keyed by its path, and the snapshot is represented by the snapshot ordinal +Map deleteFileToSnapshotOrdinal = +computeDeleteFileToSnapshotOrdinal(changelogSnapshots, snapshotOrdinals); -return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots)); +Iterable> plans = +FluentIterable.from(changelogSnapshots) +.transform( +snapshot -> { + List dataManifests = snapshot.dataManifests(table().io()); + List deleteManifests = snapshot.deleteManifests(table().io()); + + ManifestGroup manifestGroup = + new ManifestGroup(table().io(), dataManifests, deleteManifests) + .specsById(table().specs()) + .caseSensitive(isCaseSensitive()) + .select(scanColumns()) + .filterData(filter()) + .columnsToKeepStats(columnsToKeepStats()); + + if (shouldIgnoreResiduals()) { +manifestGroup = manifestGroup.ignoreResiduals(); + } + + if (dataManifests.size() > 1 && shouldPlanWithExecutor()) { Review Comment: The parallelism in `ManifestGroup::plan` is at the level of data manifest files, i.e., the unit of work is processing a data manifest file. So asking for planning with an executor service if there are many equality delete files does not help if there is only one data manifest file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
dramaticlly commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1755259576 ## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ## @@ -63,33 +60,43 @@ protected CloseableIterable doPlanFiles( return CloseableIterable.empty(); } -Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots); +Map snapshotOrdinals = computeSnapshotOrdinals(changelogSnapshots); -Set newDataManifests = -FluentIterable.from(changelogSnapshots) -.transformAndConcat(snapshot -> snapshot.dataManifests(table().io())) -.filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId())) -.toSet(); - -ManifestGroup manifestGroup = -new ManifestGroup(table().io(), newDataManifests, ImmutableList.of()) -.specsById(table().specs()) -.caseSensitive(isCaseSensitive()) -.select(scanColumns()) -.filterData(filter()) -.filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId())) -.ignoreExisting() -.columnsToKeepStats(columnsToKeepStats()); - -if (shouldIgnoreResiduals()) { - manifestGroup = manifestGroup.ignoreResiduals(); -} - -if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) { - manifestGroup = manifestGroup.planWith(planExecutor()); -} +// map of delete file to the snapshot where the delete file is added +// the delete file is keyed by its path, and the snapshot is represented by the snapshot ordinal +Map deleteFileToSnapshotOrdinal = +computeDeleteFileToSnapshotOrdinal(changelogSnapshots, snapshotOrdinals); Review Comment: yeah I agree the assessment ## spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java: ## @@ -191,39 +214,359 @@ public void testMixDeleteAndInsert() throws IOException { table.newAppend().appendFile(dataFile2).commit(); long snapshotId3 = table.currentSnapshot().snapshotId(); -CloseableIterable> taskGroups = newScan().planTasks(); +List rows = getChangelogRows(table); + +List expectedRows = Lists.newArrayList(); +addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, records1); +addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId2, 1, records1); +addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId3, 2, records2); + +assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows)); + } + + @Test + public void testPositionDeletes() throws IOException { +table.newAppend().appendFile(dataFile1).commit(); +long snapshotId1 = table.currentSnapshot().snapshotId(); + +table.newAppend().appendFile(dataFile2).commit(); +long snapshotId2 = table.currentSnapshot().snapshotId(); + +List> deletes = +Lists.newArrayList( +Pair.of(dataFile1.path(), 0L), // id = 29 +Pair.of(dataFile1.path(), 3L), // id = 89 +Pair.of(dataFile2.path(), 2L) // id = 122 +); + +Pair posDeletes = +FileHelpers.writeDeleteFile( +table, +Files.localOutput(File.createTempFile("junit", null, temp.toFile())), +TestHelpers.Row.of(0), +deletes); + +table +.newRowDelta() +.addDeletes(posDeletes.first()) +.validateDataFilesExist(posDeletes.second()) +.commit(); +long snapshotId3 = table.currentSnapshot().snapshotId(); + +List rows = getChangelogRows(table); + +List expectedRows = Lists.newArrayList(); +addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, records1); +addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, records2); +addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId3, 2, records3); + +assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows)); + } + + @Test + public void testEqualityDeletes() throws IOException { +table.newAppend().appendFile(dataFile1).commit(); +long snapshotId1 = table.currentSnapshot().snapshotId(); + +table.newAppend().appendFile(dataFile2).commit(); +long snapshotId2 = table.currentSnapshot().snapshotId(); + +Schema deleteRowSchema = table.schema().select("data"); +Record dataDelete = GenericRecord.create(deleteRowSchema); +List dataDeletes = +Lists.newArrayList( +dataDelete.copy("data", "a"), // id = 29 +dataDelete.copy("data", "d"), // id = 89 +dataDelete.copy("data", "g") // id = 122 +); + +DeleteFile eqDeletes = +FileHelpers.writeDeleteFile( +table, +Files.localOutput(File.createTempFile("junit", null, temp.toFile())), +TestHelpers.Row.of(0), +dataDeletes, +deleteRowSchema); + +table
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1753070517 ## gradle.properties: ## @@ -32,4 +32,4 @@ org.gradle.parallel=true org.gradle.configureondemand=true # explicitly disable the configuration cache org.gradle.configuration-cache=false -org.gradle.jvmargs=-Xmx1024m +org.gradle.jvmargs=-Xmx2048m Review Comment: I reverted the gradle change and the CI passed. It appears to be an inconsistent issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on PR #10935: URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2335008435 I did not have time to come back to this work this week as I hoped, but I think I should have some time next week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1747872923 ## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ## @@ -63,33 +60,43 @@ protected CloseableIterable doPlanFiles( return CloseableIterable.empty(); } -Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots); +Map snapshotOrdinals = computeSnapshotOrdinals(changelogSnapshots); -Set newDataManifests = -FluentIterable.from(changelogSnapshots) -.transformAndConcat(snapshot -> snapshot.dataManifests(table().io())) -.filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId())) -.toSet(); - -ManifestGroup manifestGroup = -new ManifestGroup(table().io(), newDataManifests, ImmutableList.of()) -.specsById(table().specs()) -.caseSensitive(isCaseSensitive()) -.select(scanColumns()) -.filterData(filter()) -.filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId())) -.ignoreExisting() -.columnsToKeepStats(columnsToKeepStats()); - -if (shouldIgnoreResiduals()) { - manifestGroup = manifestGroup.ignoreResiduals(); -} - -if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) { - manifestGroup = manifestGroup.planWith(planExecutor()); -} +// map of delete file to the snapshot where the delete file is added +// the delete file is keyed by its path, and the snapshot is represented by the snapshot ordinal +Map deleteFileToSnapshotOrdinal = +computeDeleteFileToSnapshotOrdinal(changelogSnapshots, snapshotOrdinals); Review Comment: I suspect that when the dust is settled on the V3 spec, there will be changes needed to the changelog implementation, and not just this part. I agree that we should try to keep the proposed changes in mind, but I don't think we should hold up implementing something for V2. We have to deal with V3 changes when that comes. Right now, the scan tasks we have to work with take `DeleteFile[]` parameters, as they are designed for V2. So for V2 this map makes sense for use in filtering the `DeleteFile`s to pass to those scan task classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1747869556 ## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java: ## @@ -112,13 +149,62 @@ private CloseableIterable openChangelogScanTask(ChangelogScanTask t CloseableIterable openAddedRowsScanTask(AddedRowsScanTask task) { String filePath = task.file().path().toString(); SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.deletes(), counter()); -return deletes.filter(rows(task, deletes.requiredSchema())); +int[] indexes = indexesInRow(deletes.requiredSchema()); + +return CloseableIterable.transform( +deletes.filter(rows(task, deletes.requiredSchema())), +row -> { + InternalRow expectedRow = new GenericInternalRow(columns.length); + + for (int i = 0; i < columns.length; i++) { +expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i])); + } + + return expectedRow; +}); } private CloseableIterable openDeletedDataFileScanTask(DeletedDataFileScanTask task) { String filePath = task.file().path().toString(); SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.existingDeletes(), counter()); -return deletes.filter(rows(task, deletes.requiredSchema())); +int[] indexes = indexesInRow(deletes.requiredSchema()); + +return CloseableIterable.transform( +deletes.filter(rows(task, deletes.requiredSchema())), +row -> { + InternalRow expectedRow = new GenericInternalRow(columns.length); + + for (int i = 0; i < columns.length; i++) { +expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i])); + } + + return expectedRow; +}); + } + + private CloseableIterable openDeletedRowsScanTask(DeletedRowsScanTask task) { +String filePath = task.file().path().toString(); +SparkDeleteFilter existingDeletes = +new SparkDeleteFilter(filePath, task.existingDeletes(), counter()); +SparkDeleteFilter newDeletes = new SparkDeleteFilter(filePath, task.addedDeletes(), counter()); +Schema schema1 = existingDeletes.requiredSchema(); +Schema schema2 = newDeletes.requiredSchema(); +Schema requiredSchema = TypeUtil.join(schema1, schema2); Review Comment: Sure, that is a minor thing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
manuzhang commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1740294635 ## gradle.properties: ## @@ -32,4 +32,4 @@ org.gradle.parallel=true org.gradle.configureondemand=true # explicitly disable the configuration cache org.gradle.configuration-cache=false -org.gradle.jvmargs=-Xmx1024m +org.gradle.jvmargs=-Xmx2048m Review Comment: Did it just fail here? What's changed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on PR #10935: URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2313986585 @pvary and @dramaticlly, thank you both for reviewing. I am busy with some other work at the moment, but I'll return to this by next week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
dramaticlly commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1733659899 ## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java: ## @@ -112,13 +149,62 @@ private CloseableIterable openChangelogScanTask(ChangelogScanTask t CloseableIterable openAddedRowsScanTask(AddedRowsScanTask task) { String filePath = task.file().path().toString(); SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.deletes(), counter()); -return deletes.filter(rows(task, deletes.requiredSchema())); +int[] indexes = indexesInRow(deletes.requiredSchema()); + +return CloseableIterable.transform( +deletes.filter(rows(task, deletes.requiredSchema())), +row -> { + InternalRow expectedRow = new GenericInternalRow(columns.length); + + for (int i = 0; i < columns.length; i++) { +expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i])); + } + + return expectedRow; +}); } private CloseableIterable openDeletedDataFileScanTask(DeletedDataFileScanTask task) { String filePath = task.file().path().toString(); SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.existingDeletes(), counter()); -return deletes.filter(rows(task, deletes.requiredSchema())); +int[] indexes = indexesInRow(deletes.requiredSchema()); + +return CloseableIterable.transform( +deletes.filter(rows(task, deletes.requiredSchema())), +row -> { + InternalRow expectedRow = new GenericInternalRow(columns.length); + + for (int i = 0; i < columns.length; i++) { +expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i])); + } + + return expectedRow; +}); + } + + private CloseableIterable openDeletedRowsScanTask(DeletedRowsScanTask task) { +String filePath = task.file().path().toString(); +SparkDeleteFilter existingDeletes = +new SparkDeleteFilter(filePath, task.existingDeletes(), counter()); +SparkDeleteFilter newDeletes = new SparkDeleteFilter(filePath, task.addedDeletes(), counter()); +Schema schema1 = existingDeletes.requiredSchema(); +Schema schema2 = newDeletes.requiredSchema(); +Schema requiredSchema = TypeUtil.join(schema1, schema2); Review Comment: can we inline this to ``` Schema requiredSchema = TypeUtil.join(existingDeletes.requiredSchema(), newDeletes.requiredSchema()); ``` As no other use of schema1 and schema2 below -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
dramaticlly commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1733661588 ## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ## @@ -133,51 +131,149 @@ private static Map computeSnapshotOrdinals(Deque snapsh return snapshotOrdinals; } - private static class CreateDataFileChangeTasks implements CreateTasksFunction { -private static final DeleteFile[] NO_DELETES = new DeleteFile[0]; + private Map computeDeleteFileToSnapshotOrdinal( + Deque snapshots, Map snapshotOrdinals) { +Map deleteFileToSnapshotOrdinal = Maps.newHashMap(); + +for (Snapshot snapshot : snapshots) { + Iterable deleteFiles = snapshot.addedDeleteFiles(table().io()); + for (DeleteFile deleteFile : deleteFiles) { +deleteFileToSnapshotOrdinal.put( +deleteFile.path().toString(), snapshotOrdinals.get(snapshot.snapshotId())); + } +} + +return deleteFileToSnapshotOrdinal; + } + + private static class DummyChangelogScanTask implements ChangelogScanTask { +public static final DummyChangelogScanTask INSTANCE = new DummyChangelogScanTask(); +private DummyChangelogScanTask() {} + +@Override +public ChangelogOperation operation() { + return ChangelogOperation.DELETE; +} + +@Override +public int changeOrdinal() { + return 0; +} + +@Override +public long commitSnapshotId() { + return 0L; +} + } + + private static class CreateDataFileChangeTasks implements CreateTasksFunction { +private final long snapshotId; +private final int changeOrdinal; private final Map snapshotOrdinals; +private final Map deleteFileToSnapshotOrdinal; + +CreateDataFileChangeTasks( +long snapshotId, +Map snapshotOrdinals, +Map deleteFileToSnapshotOrdinal) { + this.snapshotId = snapshotId; + this.snapshotOrdinals = snapshotOrdinals; + this.deleteFileToSnapshotOrdinal = deleteFileToSnapshotOrdinal; + this.changeOrdinal = this.snapshotOrdinals.get(snapshotId); +} + +private DeleteFile[] filterAdded(DeleteFile[] deleteFiles) { + return FluentIterable.from(deleteFiles) + .filter( + deleteFile -> + deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) == changeOrdinal) + .toArray(DeleteFile.class); +} -CreateDataFileChangeTasks(Deque snapshots) { - this.snapshotOrdinals = computeSnapshotOrdinals(snapshots); +private DeleteFile[] filterExisting(DeleteFile[] deleteFiles) { + return FluentIterable.from(deleteFiles) + .filter( + deleteFile -> + deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) < changeOrdinal) Review Comment: Can we use a comparator here and line 190 to highlight the difference between added deletes and existing deletes? I think right now you are comparing the ordinal but it's a bit hard to see as the only difference is equality and less than. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
dramaticlly commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1733677814 ## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ## @@ -63,33 +60,43 @@ protected CloseableIterable doPlanFiles( return CloseableIterable.empty(); } -Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots); +Map snapshotOrdinals = computeSnapshotOrdinals(changelogSnapshots); -Set newDataManifests = -FluentIterable.from(changelogSnapshots) -.transformAndConcat(snapshot -> snapshot.dataManifests(table().io())) -.filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId())) -.toSet(); - -ManifestGroup manifestGroup = -new ManifestGroup(table().io(), newDataManifests, ImmutableList.of()) -.specsById(table().specs()) -.caseSensitive(isCaseSensitive()) -.select(scanColumns()) -.filterData(filter()) -.filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId())) -.ignoreExisting() -.columnsToKeepStats(columnsToKeepStats()); - -if (shouldIgnoreResiduals()) { - manifestGroup = manifestGroup.ignoreResiduals(); -} - -if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) { - manifestGroup = manifestGroup.planWith(planExecutor()); -} +// map of delete file to the snapshot where the delete file is added +// the delete file is keyed by its path, and the snapshot is represented by the snapshot ordinal +Map deleteFileToSnapshotOrdinal = +computeDeleteFileToSnapshotOrdinal(changelogSnapshots, snapshotOrdinals); Review Comment: I think this might work for V2 but I think @aokolnychyi recently proposed [V3 improvement for position deletes](https://docs.google.com/document/d/18Bqhr-vnzFfQk1S4AgRISkA_5_m5m32Nnc2Cw0zn2XM) to leverage puffin files and such that many delete files will have same path but different offsets, so key by path or path based equality might not be the best. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
dramaticlly commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1733661588 ## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ## @@ -133,51 +131,149 @@ private static Map computeSnapshotOrdinals(Deque snapsh return snapshotOrdinals; } - private static class CreateDataFileChangeTasks implements CreateTasksFunction { -private static final DeleteFile[] NO_DELETES = new DeleteFile[0]; + private Map computeDeleteFileToSnapshotOrdinal( + Deque snapshots, Map snapshotOrdinals) { +Map deleteFileToSnapshotOrdinal = Maps.newHashMap(); + +for (Snapshot snapshot : snapshots) { + Iterable deleteFiles = snapshot.addedDeleteFiles(table().io()); + for (DeleteFile deleteFile : deleteFiles) { +deleteFileToSnapshotOrdinal.put( +deleteFile.path().toString(), snapshotOrdinals.get(snapshot.snapshotId())); + } +} + +return deleteFileToSnapshotOrdinal; + } + + private static class DummyChangelogScanTask implements ChangelogScanTask { +public static final DummyChangelogScanTask INSTANCE = new DummyChangelogScanTask(); +private DummyChangelogScanTask() {} + +@Override +public ChangelogOperation operation() { + return ChangelogOperation.DELETE; +} + +@Override +public int changeOrdinal() { + return 0; +} + +@Override +public long commitSnapshotId() { + return 0L; +} + } + + private static class CreateDataFileChangeTasks implements CreateTasksFunction { +private final long snapshotId; +private final int changeOrdinal; private final Map snapshotOrdinals; +private final Map deleteFileToSnapshotOrdinal; + +CreateDataFileChangeTasks( +long snapshotId, +Map snapshotOrdinals, +Map deleteFileToSnapshotOrdinal) { + this.snapshotId = snapshotId; + this.snapshotOrdinals = snapshotOrdinals; + this.deleteFileToSnapshotOrdinal = deleteFileToSnapshotOrdinal; + this.changeOrdinal = this.snapshotOrdinals.get(snapshotId); +} + +private DeleteFile[] filterAdded(DeleteFile[] deleteFiles) { + return FluentIterable.from(deleteFiles) + .filter( + deleteFile -> + deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) == changeOrdinal) + .toArray(DeleteFile.class); +} -CreateDataFileChangeTasks(Deque snapshots) { - this.snapshotOrdinals = computeSnapshotOrdinals(snapshots); +private DeleteFile[] filterExisting(DeleteFile[] deleteFiles) { + return FluentIterable.from(deleteFiles) + .filter( + deleteFile -> + deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) < changeOrdinal) Review Comment: Can we use a comparator here and line 190 to highlight the difference between added deletes and existing deletes? I think you can compare it with the snapshot ordinal but it's a bit hard to see as the only difference is equality and less than. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
dramaticlly commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1733661339 ## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ## @@ -133,51 +131,149 @@ private static Map computeSnapshotOrdinals(Deque snapsh return snapshotOrdinals; } - private static class CreateDataFileChangeTasks implements CreateTasksFunction { -private static final DeleteFile[] NO_DELETES = new DeleteFile[0]; + private Map computeDeleteFileToSnapshotOrdinal( + Deque snapshots, Map snapshotOrdinals) { +Map deleteFileToSnapshotOrdinal = Maps.newHashMap(); + +for (Snapshot snapshot : snapshots) { + Iterable deleteFiles = snapshot.addedDeleteFiles(table().io()); + for (DeleteFile deleteFile : deleteFiles) { +deleteFileToSnapshotOrdinal.put( +deleteFile.path().toString(), snapshotOrdinals.get(snapshot.snapshotId())); + } +} + +return deleteFileToSnapshotOrdinal; + } + + private static class DummyChangelogScanTask implements ChangelogScanTask { +public static final DummyChangelogScanTask INSTANCE = new DummyChangelogScanTask(); +private DummyChangelogScanTask() {} + +@Override +public ChangelogOperation operation() { + return ChangelogOperation.DELETE; +} + +@Override +public int changeOrdinal() { + return 0; +} + +@Override +public long commitSnapshotId() { + return 0L; +} + } + + private static class CreateDataFileChangeTasks implements CreateTasksFunction { +private final long snapshotId; +private final int changeOrdinal; private final Map snapshotOrdinals; +private final Map deleteFileToSnapshotOrdinal; + +CreateDataFileChangeTasks( +long snapshotId, +Map snapshotOrdinals, +Map deleteFileToSnapshotOrdinal) { + this.snapshotId = snapshotId; + this.snapshotOrdinals = snapshotOrdinals; + this.deleteFileToSnapshotOrdinal = deleteFileToSnapshotOrdinal; + this.changeOrdinal = this.snapshotOrdinals.get(snapshotId); +} + +private DeleteFile[] filterAdded(DeleteFile[] deleteFiles) { + return FluentIterable.from(deleteFiles) + .filter( + deleteFile -> + deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) == changeOrdinal) + .toArray(DeleteFile.class); +} -CreateDataFileChangeTasks(Deque snapshots) { - this.snapshotOrdinals = computeSnapshotOrdinals(snapshots); +private DeleteFile[] filterExisting(DeleteFile[] deleteFiles) { + return FluentIterable.from(deleteFiles) + .filter( + deleteFile -> + deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) < changeOrdinal) + .toArray(DeleteFile.class); } Review Comment: Can we use a comparator here to highlight the difference between added deletes and existing deletes? I think you can compare it with the snapshot ordinal but it's a bit hard to see as the only difference is equality and less than. ## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ## @@ -133,51 +131,149 @@ private static Map computeSnapshotOrdinals(Deque snapsh return snapshotOrdinals; } - private static class CreateDataFileChangeTasks implements CreateTasksFunction { -private static final DeleteFile[] NO_DELETES = new DeleteFile[0]; + private Map computeDeleteFileToSnapshotOrdinal( + Deque snapshots, Map snapshotOrdinals) { +Map deleteFileToSnapshotOrdinal = Maps.newHashMap(); + +for (Snapshot snapshot : snapshots) { + Iterable deleteFiles = snapshot.addedDeleteFiles(table().io()); + for (DeleteFile deleteFile : deleteFiles) { +deleteFileToSnapshotOrdinal.put( +deleteFile.path().toString(), snapshotOrdinals.get(snapshot.snapshotId())); + } +} + +return deleteFileToSnapshotOrdinal; + } + + private static class DummyChangelogScanTask implements ChangelogScanTask { +public static final DummyChangelogScanTask INSTANCE = new DummyChangelogScanTask(); +private DummyChangelogScanTask() {} + +@Override +public ChangelogOperation operation() { + return ChangelogOperation.DELETE; +} + +@Override +public int changeOrdinal() { + return 0; +} + +@Override +public long commitSnapshotId() { + return 0L; +} + } + + private static class CreateDataFileChangeTasks implements CreateTasksFunction { +private final long snapshotId; +private final int changeOrdinal; private final Map snapshotOrdinals; +private final Map deleteFileToSnapshotOrdinal; + +CreateDataFileChangeTasks( +long snapshotId, +Map snapshotOrdinals, +Map deleteFileToSnapshotOrdinal) { + this.snapshotId = snapshotId; + this.snapshotOrdinals = snapshotOrdinals; + this.deleteFileToSnapshotOrdinal = dele
Re: [PR] Support changelog scan for table with delete files [iceberg]
dramaticlly commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1733659899 ## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java: ## @@ -112,13 +149,62 @@ private CloseableIterable openChangelogScanTask(ChangelogScanTask t CloseableIterable openAddedRowsScanTask(AddedRowsScanTask task) { String filePath = task.file().path().toString(); SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.deletes(), counter()); -return deletes.filter(rows(task, deletes.requiredSchema())); +int[] indexes = indexesInRow(deletes.requiredSchema()); + +return CloseableIterable.transform( +deletes.filter(rows(task, deletes.requiredSchema())), +row -> { + InternalRow expectedRow = new GenericInternalRow(columns.length); + + for (int i = 0; i < columns.length; i++) { +expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i])); + } + + return expectedRow; +}); } private CloseableIterable openDeletedDataFileScanTask(DeletedDataFileScanTask task) { String filePath = task.file().path().toString(); SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.existingDeletes(), counter()); -return deletes.filter(rows(task, deletes.requiredSchema())); +int[] indexes = indexesInRow(deletes.requiredSchema()); + +return CloseableIterable.transform( +deletes.filter(rows(task, deletes.requiredSchema())), +row -> { + InternalRow expectedRow = new GenericInternalRow(columns.length); + + for (int i = 0; i < columns.length; i++) { +expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i])); + } + + return expectedRow; +}); + } + + private CloseableIterable openDeletedRowsScanTask(DeletedRowsScanTask task) { +String filePath = task.file().path().toString(); +SparkDeleteFilter existingDeletes = +new SparkDeleteFilter(filePath, task.existingDeletes(), counter()); +SparkDeleteFilter newDeletes = new SparkDeleteFilter(filePath, task.addedDeletes(), counter()); +Schema schema1 = existingDeletes.requiredSchema(); +Schema schema2 = newDeletes.requiredSchema(); +Schema requiredSchema = TypeUtil.join(schema1, schema2); Review Comment: can we inline this to ``` Schema requiredSchema = TypeUtil.join(existingDeletes.requiredSchema(), newDeletes.requiredSchema()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
dramaticlly commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1733658370 ## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java: ## @@ -112,13 +149,62 @@ private CloseableIterable openChangelogScanTask(ChangelogScanTask t CloseableIterable openAddedRowsScanTask(AddedRowsScanTask task) { String filePath = task.file().path().toString(); SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.deletes(), counter()); -return deletes.filter(rows(task, deletes.requiredSchema())); +int[] indexes = indexesInRow(deletes.requiredSchema()); + +return CloseableIterable.transform( +deletes.filter(rows(task, deletes.requiredSchema())), +row -> { + InternalRow expectedRow = new GenericInternalRow(columns.length); + + for (int i = 0; i < columns.length; i++) { +expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i])); + } + + return expectedRow; +}); Review Comment: Speak of which, i saw this update to `InternalRow` has been repeated here and below for deletedDataFileScanTask, do you know if we can put it into a method instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
dramaticlly commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1733657338 ## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java: ## @@ -112,13 +149,62 @@ private CloseableIterable openChangelogScanTask(ChangelogScanTask t CloseableIterable openAddedRowsScanTask(AddedRowsScanTask task) { String filePath = task.file().path().toString(); SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.deletes(), counter()); -return deletes.filter(rows(task, deletes.requiredSchema())); +int[] indexes = indexesInRow(deletes.requiredSchema()); + +return CloseableIterable.transform( +deletes.filter(rows(task, deletes.requiredSchema())), +row -> { + InternalRow expectedRow = new GenericInternalRow(columns.length); + + for (int i = 0; i < columns.length; i++) { +expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i])); + } + + return expectedRow; +}); Review Comment: yeah I realized this as well when reading from RowDataReader, looks like if there's position delete present it will always add `pos` metadata column as required in https://github.com/apache/iceberg/blob/684f7a767c2c216a402b60b73d2d55ef605921a0/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java#L260-L263, as well as equality delete id if equality delete are present. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
pvary commented on PR #10935: URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2305694139 Could we add some tests for the `BaseIncrementalChangelogScan` directly? It would be good if we don't depend on Spark to test core functionality. Thanks, Peter -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
pvary commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1727847483 ## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ## @@ -133,51 +131,149 @@ private static Map computeSnapshotOrdinals(Deque snapsh return snapshotOrdinals; } - private static class CreateDataFileChangeTasks implements CreateTasksFunction { -private static final DeleteFile[] NO_DELETES = new DeleteFile[0]; + private Map computeDeleteFileToSnapshotOrdinal( + Deque snapshots, Map snapshotOrdinals) { +Map deleteFileToSnapshotOrdinal = Maps.newHashMap(); + +for (Snapshot snapshot : snapshots) { + Iterable deleteFiles = snapshot.addedDeleteFiles(table().io()); + for (DeleteFile deleteFile : deleteFiles) { +deleteFileToSnapshotOrdinal.put( +deleteFile.path().toString(), snapshotOrdinals.get(snapshot.snapshotId())); + } +} + +return deleteFileToSnapshotOrdinal; + } + + private static class DummyChangelogScanTask implements ChangelogScanTask { +public static final DummyChangelogScanTask INSTANCE = new DummyChangelogScanTask(); +private DummyChangelogScanTask() {} + +@Override +public ChangelogOperation operation() { + return ChangelogOperation.DELETE; +} + +@Override +public int changeOrdinal() { + return 0; +} + +@Override +public long commitSnapshotId() { + return 0L; +} + } + + private static class CreateDataFileChangeTasks implements CreateTasksFunction { +private final long snapshotId; +private final int changeOrdinal; private final Map snapshotOrdinals; +private final Map deleteFileToSnapshotOrdinal; + +CreateDataFileChangeTasks( +long snapshotId, +Map snapshotOrdinals, +Map deleteFileToSnapshotOrdinal) { + this.snapshotId = snapshotId; + this.snapshotOrdinals = snapshotOrdinals; + this.deleteFileToSnapshotOrdinal = deleteFileToSnapshotOrdinal; + this.changeOrdinal = this.snapshotOrdinals.get(snapshotId); +} + +private DeleteFile[] filterAdded(DeleteFile[] deleteFiles) { + return FluentIterable.from(deleteFiles) + .filter( + deleteFile -> + deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) == changeOrdinal) + .toArray(DeleteFile.class); +} -CreateDataFileChangeTasks(Deque snapshots) { - this.snapshotOrdinals = computeSnapshotOrdinals(snapshots); +private DeleteFile[] filterExisting(DeleteFile[] deleteFiles) { + return FluentIterable.from(deleteFiles) + .filter( + deleteFile -> + deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) < changeOrdinal) + .toArray(DeleteFile.class); } @Override public CloseableIterable apply( CloseableIterable> entries, TaskContext context) { - return CloseableIterable.transform( - entries, - entry -> { -long commitSnapshotId = entry.snapshotId(); -int changeOrdinal = snapshotOrdinals.get(commitSnapshotId); -DataFile dataFile = entry.file().copy(context.shouldKeepStats()); - -switch (entry.status()) { - case ADDED: -return new BaseAddedRowsScanTask( -changeOrdinal, -commitSnapshotId, -dataFile, -NO_DELETES, -context.schemaAsString(), -context.specAsString(), -context.residuals()); - - case DELETED: -return new BaseDeletedDataFileScanTask( -changeOrdinal, -commitSnapshotId, -dataFile, -NO_DELETES, -context.schemaAsString(), -context.specAsString(), -context.residuals()); - - default: -throw new IllegalArgumentException("Unexpected entry status: " + entry.status()); -} - }); + CloseableIterable tasks = + CloseableIterable.transform( + entries, + entry -> { +long entrySnapshotId = entry.snapshotId(); +DataFile dataFile = entry.file().copy(context.shouldKeepStats()); +DeleteFile[] addedDeleteFiles = filterAdded(context.deletes().forEntry(entry)); + +switch (entry.status()) { + case ADDED: +if (entrySnapshotId == snapshotId) { + return new BaseAddedRowsScanTask( + changeOrdinal, + snapshotId, + dataFile, + addedDeleteFiles, + context.schemaAsString(), + context.specAsString(), +
Re: [PR] Support changelog scan for table with delete files [iceberg]
pvary commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1727828125 ## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ## @@ -133,51 +131,149 @@ private static Map computeSnapshotOrdinals(Deque snapsh return snapshotOrdinals; } - private static class CreateDataFileChangeTasks implements CreateTasksFunction { -private static final DeleteFile[] NO_DELETES = new DeleteFile[0]; + private Map computeDeleteFileToSnapshotOrdinal( + Deque snapshots, Map snapshotOrdinals) { +Map deleteFileToSnapshotOrdinal = Maps.newHashMap(); + +for (Snapshot snapshot : snapshots) { + Iterable deleteFiles = snapshot.addedDeleteFiles(table().io()); + for (DeleteFile deleteFile : deleteFiles) { +deleteFileToSnapshotOrdinal.put( +deleteFile.path().toString(), snapshotOrdinals.get(snapshot.snapshotId())); + } +} + +return deleteFileToSnapshotOrdinal; + } + + private static class DummyChangelogScanTask implements ChangelogScanTask { +public static final DummyChangelogScanTask INSTANCE = new DummyChangelogScanTask(); +private DummyChangelogScanTask() {} + +@Override +public ChangelogOperation operation() { + return ChangelogOperation.DELETE; +} + +@Override +public int changeOrdinal() { + return 0; +} + +@Override +public long commitSnapshotId() { + return 0L; +} + } + + private static class CreateDataFileChangeTasks implements CreateTasksFunction { +private final long snapshotId; +private final int changeOrdinal; private final Map snapshotOrdinals; +private final Map deleteFileToSnapshotOrdinal; + +CreateDataFileChangeTasks( +long snapshotId, +Map snapshotOrdinals, +Map deleteFileToSnapshotOrdinal) { + this.snapshotId = snapshotId; + this.snapshotOrdinals = snapshotOrdinals; + this.deleteFileToSnapshotOrdinal = deleteFileToSnapshotOrdinal; + this.changeOrdinal = this.snapshotOrdinals.get(snapshotId); +} + +private DeleteFile[] filterAdded(DeleteFile[] deleteFiles) { + return FluentIterable.from(deleteFiles) + .filter( + deleteFile -> + deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) == changeOrdinal) + .toArray(DeleteFile.class); +} -CreateDataFileChangeTasks(Deque snapshots) { - this.snapshotOrdinals = computeSnapshotOrdinals(snapshots); +private DeleteFile[] filterExisting(DeleteFile[] deleteFiles) { + return FluentIterable.from(deleteFiles) + .filter( + deleteFile -> + deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) < changeOrdinal) + .toArray(DeleteFile.class); } @Override public CloseableIterable apply( CloseableIterable> entries, TaskContext context) { - return CloseableIterable.transform( - entries, - entry -> { -long commitSnapshotId = entry.snapshotId(); -int changeOrdinal = snapshotOrdinals.get(commitSnapshotId); -DataFile dataFile = entry.file().copy(context.shouldKeepStats()); - -switch (entry.status()) { - case ADDED: -return new BaseAddedRowsScanTask( -changeOrdinal, -commitSnapshotId, -dataFile, -NO_DELETES, -context.schemaAsString(), -context.specAsString(), -context.residuals()); - - case DELETED: -return new BaseDeletedDataFileScanTask( -changeOrdinal, -commitSnapshotId, -dataFile, -NO_DELETES, -context.schemaAsString(), -context.specAsString(), -context.residuals()); - - default: -throw new IllegalArgumentException("Unexpected entry status: " + entry.status()); -} - }); + CloseableIterable tasks = + CloseableIterable.transform( + entries, + entry -> { +long entrySnapshotId = entry.snapshotId(); +DataFile dataFile = entry.file().copy(context.shouldKeepStats()); +DeleteFile[] addedDeleteFiles = filterAdded(context.deletes().forEntry(entry)); Review Comment: We can reuse `context.deletes().forEntry(entry)` later, it might worth to add a variable for it and reuse later -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org ---
Re: [PR] Support changelog scan for table with delete files [iceberg]
pvary commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1727796525 ## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ## @@ -63,33 +60,43 @@ protected CloseableIterable doPlanFiles( return CloseableIterable.empty(); } -Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots); +Map snapshotOrdinals = computeSnapshotOrdinals(changelogSnapshots); -Set newDataManifests = -FluentIterable.from(changelogSnapshots) -.transformAndConcat(snapshot -> snapshot.dataManifests(table().io())) -.filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId())) -.toSet(); - -ManifestGroup manifestGroup = -new ManifestGroup(table().io(), newDataManifests, ImmutableList.of()) -.specsById(table().specs()) -.caseSensitive(isCaseSensitive()) -.select(scanColumns()) -.filterData(filter()) -.filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId())) -.ignoreExisting() -.columnsToKeepStats(columnsToKeepStats()); - -if (shouldIgnoreResiduals()) { - manifestGroup = manifestGroup.ignoreResiduals(); -} - -if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) { - manifestGroup = manifestGroup.planWith(planExecutor()); -} +// map of delete file to the snapshot where the delete file is added +// the delete file is keyed by its path, and the snapshot is represented by the snapshot ordinal +Map deleteFileToSnapshotOrdinal = +computeDeleteFileToSnapshotOrdinal(changelogSnapshots, snapshotOrdinals); -return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots)); +Iterable> plans = +FluentIterable.from(changelogSnapshots) +.transform( +snapshot -> { + List dataManifests = snapshot.dataManifests(table().io()); + List deleteManifests = snapshot.deleteManifests(table().io()); + + ManifestGroup manifestGroup = + new ManifestGroup(table().io(), dataManifests, deleteManifests) + .specsById(table().specs()) + .caseSensitive(isCaseSensitive()) + .select(scanColumns()) + .filterData(filter()) + .columnsToKeepStats(columnsToKeepStats()); + + if (shouldIgnoreResiduals()) { +manifestGroup = manifestGroup.ignoreResiduals(); + } + + if (dataManifests.size() > 1 && shouldPlanWithExecutor()) { +manifestGroup = manifestGroup.planWith(planExecutor()); + } + + long snapshotId = snapshot.snapshotId(); + return manifestGroup.plan( + new CreateDataFileChangeTasks( + snapshotId, snapshotOrdinals, deleteFileToSnapshotOrdinal)); Review Comment: As agreed on the dev list, let's proceed with the snapshot-by-snapshot reader for now, and decide on the net_changes solution later -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1727409290 ## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ## @@ -63,33 +60,43 @@ protected CloseableIterable doPlanFiles( return CloseableIterable.empty(); } -Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots); +Map snapshotOrdinals = computeSnapshotOrdinals(changelogSnapshots); -Set newDataManifests = -FluentIterable.from(changelogSnapshots) -.transformAndConcat(snapshot -> snapshot.dataManifests(table().io())) -.filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId())) -.toSet(); - -ManifestGroup manifestGroup = -new ManifestGroup(table().io(), newDataManifests, ImmutableList.of()) -.specsById(table().specs()) -.caseSensitive(isCaseSensitive()) -.select(scanColumns()) -.filterData(filter()) -.filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId())) -.ignoreExisting() -.columnsToKeepStats(columnsToKeepStats()); - -if (shouldIgnoreResiduals()) { - manifestGroup = manifestGroup.ignoreResiduals(); -} - -if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) { - manifestGroup = manifestGroup.planWith(planExecutor()); -} +// map of delete file to the snapshot where the delete file is added +// the delete file is keyed by its path, and the snapshot is represented by the snapshot ordinal +Map deleteFileToSnapshotOrdinal = +computeDeleteFileToSnapshotOrdinal(changelogSnapshots, snapshotOrdinals); -return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots)); +Iterable> plans = +FluentIterable.from(changelogSnapshots) +.transform( +snapshot -> { + List dataManifests = snapshot.dataManifests(table().io()); + List deleteManifests = snapshot.deleteManifests(table().io()); + + ManifestGroup manifestGroup = + new ManifestGroup(table().io(), dataManifests, deleteManifests) + .specsById(table().specs()) + .caseSensitive(isCaseSensitive()) + .select(scanColumns()) + .filterData(filter()) + .columnsToKeepStats(columnsToKeepStats()); + + if (shouldIgnoreResiduals()) { +manifestGroup = manifestGroup.ignoreResiduals(); + } + + if (dataManifests.size() > 1 && shouldPlanWithExecutor()) { +manifestGroup = manifestGroup.planWith(planExecutor()); + } + + long snapshotId = snapshot.snapshotId(); + return manifestGroup.plan( + new CreateDataFileChangeTasks( + snapshotId, snapshotOrdinals, deleteFileToSnapshotOrdinal)); Review Comment: Ah, in this case, (b) is the correct behavior. The changelog scan is an incremental scan over multiple snapshots in a range, and should emit the changes for each snapshot. This is the current behavior for the supported case, which is copy-on-write. What you are seeking are the net changes, which is functionality that is also supported by Spark, and built on top of the changelog scan. This uses `ChangelogIterator.removeNetCarryovers`. This functionality is exposed in the Spark procedure, `create_changelog_view`. (Of course, one can also use it programmatically.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
pvary commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1726833603 ## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ## @@ -63,33 +60,43 @@ protected CloseableIterable doPlanFiles( return CloseableIterable.empty(); } -Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots); +Map snapshotOrdinals = computeSnapshotOrdinals(changelogSnapshots); -Set newDataManifests = -FluentIterable.from(changelogSnapshots) -.transformAndConcat(snapshot -> snapshot.dataManifests(table().io())) -.filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId())) -.toSet(); - -ManifestGroup manifestGroup = -new ManifestGroup(table().io(), newDataManifests, ImmutableList.of()) -.specsById(table().specs()) -.caseSensitive(isCaseSensitive()) -.select(scanColumns()) -.filterData(filter()) -.filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId())) -.ignoreExisting() -.columnsToKeepStats(columnsToKeepStats()); - -if (shouldIgnoreResiduals()) { - manifestGroup = manifestGroup.ignoreResiduals(); -} - -if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) { - manifestGroup = manifestGroup.planWith(planExecutor()); -} +// map of delete file to the snapshot where the delete file is added +// the delete file is keyed by its path, and the snapshot is represented by the snapshot ordinal +Map deleteFileToSnapshotOrdinal = +computeDeleteFileToSnapshotOrdinal(changelogSnapshots, snapshotOrdinals); -return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots)); +Iterable> plans = +FluentIterable.from(changelogSnapshots) +.transform( +snapshot -> { + List dataManifests = snapshot.dataManifests(table().io()); + List deleteManifests = snapshot.deleteManifests(table().io()); + + ManifestGroup manifestGroup = + new ManifestGroup(table().io(), dataManifests, deleteManifests) + .specsById(table().specs()) + .caseSensitive(isCaseSensitive()) + .select(scanColumns()) + .filterData(filter()) + .columnsToKeepStats(columnsToKeepStats()); + + if (shouldIgnoreResiduals()) { +manifestGroup = manifestGroup.ignoreResiduals(); + } + + if (dataManifests.size() > 1 && shouldPlanWithExecutor()) { +manifestGroup = manifestGroup.planWith(planExecutor()); + } + + long snapshotId = snapshot.snapshotId(); + return manifestGroup.plan( + new CreateDataFileChangeTasks( + snapshotId, snapshotOrdinals, deleteFileToSnapshotOrdinal)); Review Comment: I'm not sure that we can filter with the current snapshotId. I think we need to filter with the `toSnapshotId` of the scan, so we don't emit records which are added and deleted when the `IncrementalScan` is for multiple snapshots. I have asked the corresponding question on the dev list thread, but for reference: > What is the expected behaviour when the `IncrementalScan` is created for not a single snapshot, but for multiple snapshots? > - S1 added PK1-V1 > - S2 updated PK1-V1 to PK1-V1b (removed PK1-V1 and added PK1-V1b) > - S3 updated PK1-V1b to PK1-V1c (removed PK1-V1b and added PK1-V1c) > > Let's say we have IncrementalScan.fromSnapshotInclusive(S1).toSnapshot(S3). > Or we need to return: > (a) > - PK1,V1c,INSERTED > > Or is it ok, to return: > (b) > - PK1,V1,INSERTED > - PK1,V1,DELETED > - PK1,V1b,INSERTED > - PK1,V1b,DELETED > - PK1,V1c,INSERTED I think the (a) is the correct behaviour. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
pvary commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1726782294 ## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ## @@ -63,33 +60,43 @@ protected CloseableIterable doPlanFiles( return CloseableIterable.empty(); } -Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots); +Map snapshotOrdinals = computeSnapshotOrdinals(changelogSnapshots); -Set newDataManifests = -FluentIterable.from(changelogSnapshots) -.transformAndConcat(snapshot -> snapshot.dataManifests(table().io())) -.filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId())) -.toSet(); - -ManifestGroup manifestGroup = -new ManifestGroup(table().io(), newDataManifests, ImmutableList.of()) -.specsById(table().specs()) -.caseSensitive(isCaseSensitive()) -.select(scanColumns()) -.filterData(filter()) -.filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId())) -.ignoreExisting() -.columnsToKeepStats(columnsToKeepStats()); - -if (shouldIgnoreResiduals()) { - manifestGroup = manifestGroup.ignoreResiduals(); -} - -if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) { - manifestGroup = manifestGroup.planWith(planExecutor()); -} +// map of delete file to the snapshot where the delete file is added +// the delete file is keyed by its path, and the snapshot is represented by the snapshot ordinal +Map deleteFileToSnapshotOrdinal = +computeDeleteFileToSnapshotOrdinal(changelogSnapshots, snapshotOrdinals); -return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots)); +Iterable> plans = +FluentIterable.from(changelogSnapshots) +.transform( +snapshot -> { + List dataManifests = snapshot.dataManifests(table().io()); + List deleteManifests = snapshot.deleteManifests(table().io()); + + ManifestGroup manifestGroup = + new ManifestGroup(table().io(), dataManifests, deleteManifests) + .specsById(table().specs()) + .caseSensitive(isCaseSensitive()) + .select(scanColumns()) + .filterData(filter()) + .columnsToKeepStats(columnsToKeepStats()); + + if (shouldIgnoreResiduals()) { +manifestGroup = manifestGroup.ignoreResiduals(); + } + + if (dataManifests.size() > 1 && shouldPlanWithExecutor()) { Review Comment: Do we want to plan with an executor if we have many equality delete files? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1726394464 ## spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java: ## @@ -191,39 +214,361 @@ public void testMixDeleteAndInsert() throws IOException { table.newAppend().appendFile(dataFile2).commit(); long snapshotId3 = table.currentSnapshot().snapshotId(); -CloseableIterable> taskGroups = newScan().planTasks(); +List rows = getChangelogRows(table); + +List expectedRows = Lists.newArrayList(); +addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, records1); +addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId2, 1, records1); +addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId3, 2, records2); + +assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows)); + } + + @Test + public void testPositionDeletes() throws IOException { +table.newAppend().appendFile(dataFile1).commit(); +long snapshotId1 = table.currentSnapshot().snapshotId(); + +table.newAppend().appendFile(dataFile2).commit(); +long snapshotId2 = table.currentSnapshot().snapshotId(); + +List> deletes = +Lists.newArrayList( +Pair.of(dataFile1.path(), 0L), // id = 29 +Pair.of(dataFile1.path(), 3L), // id = 89 +Pair.of(dataFile2.path(), 2L) // id = 122 +); + +Pair posDeletes = +FileHelpers.writeDeleteFile( +table, +Files.localOutput(File.createTempFile("junit", null, temp.toFile())), +TestHelpers.Row.of(0), +deletes); + +table +.newRowDelta() +.addDeletes(posDeletes.first()) +.validateDataFilesExist(posDeletes.second()) +.commit(); +long snapshotId3 = table.currentSnapshot().snapshotId(); + +List rows = getChangelogRows(table); + +List expectedRows = Lists.newArrayList(); +addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, records1); +addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, records2); +addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId3, 2, records3); + +assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows)); + } + + @Test + public void testEqualityDeletes() throws IOException { +table.newAppend().appendFile(dataFile1).commit(); +long snapshotId1 = table.currentSnapshot().snapshotId(); + +table.newAppend().appendFile(dataFile2).commit(); +long snapshotId2 = table.currentSnapshot().snapshotId(); + +Schema deleteRowSchema = table.schema().select("data"); +Record dataDelete = GenericRecord.create(deleteRowSchema); +List dataDeletes = +Lists.newArrayList( +dataDelete.copy("data", "a"), // id = 29 +dataDelete.copy("data", "d"), // id = 89 +dataDelete.copy("data", "g") // id = 122 +); + +DeleteFile eqDeletes = +FileHelpers.writeDeleteFile( +table, +Files.localOutput(File.createTempFile("junit", null, temp.toFile())), +TestHelpers.Row.of(0), +dataDeletes, +deleteRowSchema); + +table.newRowDelta().addDeletes(eqDeletes).commit(); +long snapshotId3 = table.currentSnapshot().snapshotId(); + +List rows = getChangelogRows(table); + +List expectedRows = Lists.newArrayList(); +addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, records1); +addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, records2); +addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId3, 2, records3); + +assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows)); + } + + @Test + public void testMixOfPositionAndEqualityDeletes() throws IOException { +table.newAppend().appendFile(dataFile1).commit(); +long snapshotId1 = table.currentSnapshot().snapshotId(); + +table.newAppend().appendFile(dataFile2).commit(); +long snapshotId2 = table.currentSnapshot().snapshotId(); + +List> deletes = +Lists.newArrayList( +Pair.of(dataFile1.path(), 0L), // id = 29 +Pair.of(dataFile1.path(), 3L) // id = 89 +); + +Pair posDeletes = +FileHelpers.writeDeleteFile( +table, +Files.localOutput(File.createTempFile("junit", null, temp.toFile())), +TestHelpers.Row.of(0), +deletes); + +Schema deleteRowSchema = table.schema().select("data"); +Record dataDelete = GenericRecord.create(deleteRowSchema); +List dataDeletes = +Lists.newArrayList( +dataDelete.copy("data", "a"), // id = 29 +dataDelete.copy("data", "g") // id = 122 +); + +DeleteFile eqDeletes = +FileHelpers.writeDeleteFile( +
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on PR #10935: URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2303850176 @pvary I have fixed the implementation so that existing deletes are applied before new deletes are emitted. I have fixed the test case accordingly. (I also renamed `testFlinkScenario1` and t`estFlinkScenario2`.) I think things work as you expect now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1726387609 ## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java: ## @@ -112,13 +149,62 @@ private CloseableIterable openChangelogScanTask(ChangelogScanTask t CloseableIterable openAddedRowsScanTask(AddedRowsScanTask task) { String filePath = task.file().path().toString(); SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.deletes(), counter()); -return deletes.filter(rows(task, deletes.requiredSchema())); +int[] indexes = indexesInRow(deletes.requiredSchema()); + +return CloseableIterable.transform( +deletes.filter(rows(task, deletes.requiredSchema())), +row -> { + InternalRow expectedRow = new GenericInternalRow(columns.length); + + for (int i = 0; i < columns.length; i++) { +expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i])); + } + + return expectedRow; +}); Review Comment: When `DeleteFilter::filter` is called, if there are deletes to be applied, the `requiredSchema` could have additional columns beyond the requested schema (e.g., `_pos` is added if there are positional deletes to be applied). Thus the `InternalRow` could have more columns than the requested schema, and we need to exclude the additional columns and only keep the ones for the requested schema. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1726379602 ## data/src/main/java/org/apache/iceberg/data/DeleteFilter.java: ## @@ -197,31 +197,31 @@ record -> deleteSet.contains(projectRow.wrap(asStructLike(record))); } public CloseableIterable findEqualityDeleteRows(CloseableIterable records) { -// Predicate to test whether a row has been deleted by equality deletions. -Predicate deletedRows = applyEqDeletes().stream().reduce(Predicate::or).orElse(t -> false); - -return CloseableIterable.filter(records, deletedRows); +return CloseableIterable.filter(records, isEqDeleted()); } private CloseableIterable applyEqDeletes(CloseableIterable records) { -Predicate isEqDeleted = applyEqDeletes().stream().reduce(Predicate::or).orElse(t -> false); - -return createDeleteIterable(records, isEqDeleted); +return createDeleteIterable(records, isEqDeleted()); } protected void markRowDeleted(T item) { throw new UnsupportedOperationException( this.getClass().getName() + " does not implement markRowDeleted"); } - public Predicate eqDeletedRowFilter() { + // Predicate to test whether a row has been deleted by equality deletes + public Predicate isEqDeleted() { if (eqDeleteRows == null) { - eqDeleteRows = - applyEqDeletes().stream().map(Predicate::negate).reduce(Predicate::and).orElse(t -> true); + eqDeleteRows = applyEqDeletes().stream().reduce(Predicate::or).orElse(t -> false); } return eqDeleteRows; } + // Predicate to test whether a row has not been deleted by equality deletes + public Predicate eqDeletedRowFilter() { +return isEqDeleted().negate(); + } Review Comment: This is only used in one place, `ColumnarBatchReader.ColumnarBatchLoader::applyEqDelete(ColumnarBatch)`. I propose deprecating this and changing the call to the negation of `isEqDeleted()` instead. `isEqDeleted()` is more useful. I'll do that in a separate PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1726371909 ## gradle.properties: ## @@ -32,4 +32,4 @@ org.gradle.parallel=true org.gradle.configureondemand=true # explicitly disable the configuration cache org.gradle.configuration-cache=false -org.gradle.jvmargs=-Xmx1024m +org.gradle.jvmargs=-Xmx2048m Review Comment: Java CI failed with "Java heap space" error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on PR #10935: URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2302563661 @pvary thanks for posting on the dev list thread! In your description of scenario 1, you wrote: > > When the changelog scan reads the 3rd snapshot, it should consider: > > Delete reads: > DF1 - omit records deleted by ED1, emit records deleted by ED2 > DF2 - emit records deleted by ED2 > Data read: > DF3 - emit all records This corresponds to case (b) in the thread. But now I understand that you must have made a mistake and did not really mean the > DF1 - omit records deleted by ED1, emit records deleted by ED2 and think that (a) is the correct behavior. The current behavior is (b), but I'll work on changing the implementation to handle this equality delete case so that the behavior is (a). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
pvary commented on PR #10935: URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2302465047 > @pvary thanks for reviewing the tests! Just a quick response for now. For scenario 1, the behavior is as I thought you described, yet in your comment on the test code, you indicated that it is incorrect, so I am confused. To be honest, I thought about it some more yesterday and I actually think it is incorrect too. So I sent an email to the Iceberg dev list (please see that email), asking for clarification. The current behavior is case (b) in the email, which is what I thought you expected. I now think the behavior should be case (a). Please add your thoughts to the thread in the dev list too. Sorry for the confusion @wypoon! I have tried to describe the (a) case in my description with this sentence: > Notice that for DF1 we should not emit records which are deleted by previous deletes. Anyways, added my comments to the thread as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on PR #10935: URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2302439337 @pvary thanks for reviewing the tests! Just a quick response for now. For scenario 1, the behavior is as I thought you described, yet in your comment on the test code, you indicated that it is incorrect, so I am confused. To be honest, I thought about it some more yesterday and I actually think it is incorrect too. So I sent an email to the Iceberg dev list (please see that email), asking for clarification. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
flyrain commented on PR #10935: URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2302430012 cc @dramaticlly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
pvary commented on PR #10935: URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2302344676 > @pvary I have added `testFlinkScenario1` and `testFlinkScenario2` to `TestChangelogReader` based on your two scenarios. Please check the expected results. (I will rename the tests later with more descriptive names.) These are very great tests. Thanks for implementing them! > For scenario 1, I agree with you on what the changelog should emit. If DF1 is in snapshot 3, then we should emit the row with PK1 being deleted by ED2. (The row is deleted by ED1 too, but we should only emit the row once, not twice, for snapshot 3). Do I understand correctly that this is not yet the situation with the current code? > For scenario 2, I think that when a row in a data file is deleted by a positional delete in the same commit, that row should neither be shown as inserted nor as deleted. This is where I think we disagree. (IIUC, you expect to see it as deleted but not as inserted. To me, that would be inconsistent.) This part of scenario 2 is actually already tested by `testAddingAndDeletingInSameCommit`. I think we agree here. I'm perfectly fine if we can make sure that the added and immediately removed records are not emitted during the incremental scan read. > If you agree with my analysis, then my implementation does handle at least your two scenarios correctly. I think the output for snapshot3 in scenario1 is not correct. I have left a comment there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
pvary commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1725269441 ## spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java: ## @@ -191,39 +214,361 @@ public void testMixDeleteAndInsert() throws IOException { table.newAppend().appendFile(dataFile2).commit(); long snapshotId3 = table.currentSnapshot().snapshotId(); -CloseableIterable> taskGroups = newScan().planTasks(); +List rows = getChangelogRows(table); + +List expectedRows = Lists.newArrayList(); +addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, records1); +addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId2, 1, records1); +addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId3, 2, records2); + +assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows)); + } + + @Test + public void testPositionDeletes() throws IOException { +table.newAppend().appendFile(dataFile1).commit(); +long snapshotId1 = table.currentSnapshot().snapshotId(); + +table.newAppend().appendFile(dataFile2).commit(); +long snapshotId2 = table.currentSnapshot().snapshotId(); + +List> deletes = +Lists.newArrayList( +Pair.of(dataFile1.path(), 0L), // id = 29 +Pair.of(dataFile1.path(), 3L), // id = 89 +Pair.of(dataFile2.path(), 2L) // id = 122 +); + +Pair posDeletes = +FileHelpers.writeDeleteFile( +table, +Files.localOutput(File.createTempFile("junit", null, temp.toFile())), +TestHelpers.Row.of(0), +deletes); + +table +.newRowDelta() +.addDeletes(posDeletes.first()) +.validateDataFilesExist(posDeletes.second()) +.commit(); +long snapshotId3 = table.currentSnapshot().snapshotId(); + +List rows = getChangelogRows(table); + +List expectedRows = Lists.newArrayList(); +addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, records1); +addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, records2); +addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId3, 2, records3); + +assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows)); + } + + @Test + public void testEqualityDeletes() throws IOException { +table.newAppend().appendFile(dataFile1).commit(); +long snapshotId1 = table.currentSnapshot().snapshotId(); + +table.newAppend().appendFile(dataFile2).commit(); +long snapshotId2 = table.currentSnapshot().snapshotId(); + +Schema deleteRowSchema = table.schema().select("data"); +Record dataDelete = GenericRecord.create(deleteRowSchema); +List dataDeletes = +Lists.newArrayList( +dataDelete.copy("data", "a"), // id = 29 +dataDelete.copy("data", "d"), // id = 89 +dataDelete.copy("data", "g") // id = 122 +); + +DeleteFile eqDeletes = +FileHelpers.writeDeleteFile( +table, +Files.localOutput(File.createTempFile("junit", null, temp.toFile())), +TestHelpers.Row.of(0), +dataDeletes, +deleteRowSchema); + +table.newRowDelta().addDeletes(eqDeletes).commit(); +long snapshotId3 = table.currentSnapshot().snapshotId(); + +List rows = getChangelogRows(table); + +List expectedRows = Lists.newArrayList(); +addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, records1); +addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, records2); +addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId3, 2, records3); + +assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows)); + } + + @Test + public void testMixOfPositionAndEqualityDeletes() throws IOException { +table.newAppend().appendFile(dataFile1).commit(); +long snapshotId1 = table.currentSnapshot().snapshotId(); + +table.newAppend().appendFile(dataFile2).commit(); +long snapshotId2 = table.currentSnapshot().snapshotId(); + +List> deletes = +Lists.newArrayList( +Pair.of(dataFile1.path(), 0L), // id = 29 +Pair.of(dataFile1.path(), 3L) // id = 89 +); + +Pair posDeletes = +FileHelpers.writeDeleteFile( +table, +Files.localOutput(File.createTempFile("junit", null, temp.toFile())), +TestHelpers.Row.of(0), +deletes); + +Schema deleteRowSchema = table.schema().select("data"); +Record dataDelete = GenericRecord.create(deleteRowSchema); +List dataDeletes = +Lists.newArrayList( +dataDelete.copy("data", "a"), // id = 29 +dataDelete.copy("data", "g") // id = 122 +); + +DeleteFile eqDeletes = +FileHelpers.writeDeleteFile( +t
Re: [PR] Support changelog scan for table with delete files [iceberg]
pvary commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1725175321 ## spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java: ## @@ -191,39 +214,361 @@ public void testMixDeleteAndInsert() throws IOException { table.newAppend().appendFile(dataFile2).commit(); long snapshotId3 = table.currentSnapshot().snapshotId(); -CloseableIterable> taskGroups = newScan().planTasks(); +List rows = getChangelogRows(table); + +List expectedRows = Lists.newArrayList(); +addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, records1); +addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId2, 1, records1); +addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId3, 2, records2); + +assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows)); + } + + @Test + public void testPositionDeletes() throws IOException { +table.newAppend().appendFile(dataFile1).commit(); +long snapshotId1 = table.currentSnapshot().snapshotId(); + +table.newAppend().appendFile(dataFile2).commit(); +long snapshotId2 = table.currentSnapshot().snapshotId(); + +List> deletes = +Lists.newArrayList( +Pair.of(dataFile1.path(), 0L), // id = 29 +Pair.of(dataFile1.path(), 3L), // id = 89 +Pair.of(dataFile2.path(), 2L) // id = 122 +); + +Pair posDeletes = +FileHelpers.writeDeleteFile( +table, +Files.localOutput(File.createTempFile("junit", null, temp.toFile())), +TestHelpers.Row.of(0), +deletes); + +table +.newRowDelta() +.addDeletes(posDeletes.first()) +.validateDataFilesExist(posDeletes.second()) +.commit(); +long snapshotId3 = table.currentSnapshot().snapshotId(); + +List rows = getChangelogRows(table); + +List expectedRows = Lists.newArrayList(); +addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, records1); +addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, records2); +addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId3, 2, records3); + +assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows)); + } + + @Test + public void testEqualityDeletes() throws IOException { +table.newAppend().appendFile(dataFile1).commit(); +long snapshotId1 = table.currentSnapshot().snapshotId(); + +table.newAppend().appendFile(dataFile2).commit(); +long snapshotId2 = table.currentSnapshot().snapshotId(); + +Schema deleteRowSchema = table.schema().select("data"); +Record dataDelete = GenericRecord.create(deleteRowSchema); +List dataDeletes = +Lists.newArrayList( +dataDelete.copy("data", "a"), // id = 29 +dataDelete.copy("data", "d"), // id = 89 +dataDelete.copy("data", "g") // id = 122 +); + +DeleteFile eqDeletes = +FileHelpers.writeDeleteFile( +table, +Files.localOutput(File.createTempFile("junit", null, temp.toFile())), +TestHelpers.Row.of(0), +dataDeletes, +deleteRowSchema); + +table.newRowDelta().addDeletes(eqDeletes).commit(); +long snapshotId3 = table.currentSnapshot().snapshotId(); + +List rows = getChangelogRows(table); + +List expectedRows = Lists.newArrayList(); +addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, records1); +addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, records2); +addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId3, 2, records3); + +assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows)); + } + + @Test + public void testMixOfPositionAndEqualityDeletes() throws IOException { +table.newAppend().appendFile(dataFile1).commit(); +long snapshotId1 = table.currentSnapshot().snapshotId(); + +table.newAppend().appendFile(dataFile2).commit(); +long snapshotId2 = table.currentSnapshot().snapshotId(); + +List> deletes = +Lists.newArrayList( +Pair.of(dataFile1.path(), 0L), // id = 29 +Pair.of(dataFile1.path(), 3L) // id = 89 +); + +Pair posDeletes = +FileHelpers.writeDeleteFile( +table, +Files.localOutput(File.createTempFile("junit", null, temp.toFile())), +TestHelpers.Row.of(0), +deletes); + +Schema deleteRowSchema = table.schema().select("data"); +Record dataDelete = GenericRecord.create(deleteRowSchema); +List dataDeletes = +Lists.newArrayList( +dataDelete.copy("data", "a"), // id = 29 +dataDelete.copy("data", "g") // id = 122 +); + +DeleteFile eqDeletes = +FileHelpers.writeDeleteFile( +t
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on PR #10935: URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2297933397 @pvary I have added `testFlinkScenario1` and `testFlinkScenario2` to `TestChangelogReader`. Please check the expected results. (I will rename the tests later with more descriptive names.) For scenario 1, I agree with you on what the changelog should emit. If DF1 is in snapshot 3, then we should emit the row with PK1 being deleted by ED2. (The row is deleted by ED1 too, but we should only emit the row once, not twice, for snapshot 3). For scenario 2, I think that when a row in a data file is deleted by a positional delete in the same commit, that row should neither be shown as inserted nor as deleted. This is where I think we disagree. (IIUC, you expect to see it as deleted but not as inserted. To me, that would be inconsistent.) This part of scenario 2 is actually already tested by `testAddingAndDeletingInSameCommit`. If you agree with my analysis, then my implementation does handle at least your two scenarios correctly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
pvary commented on PR #10935: URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2295126370 > For the positional delete case, I do not believe deleting the same position in the same data file again can happen I agree, that this should be the case. My only concern is, that until now, it didn't cause any issues. We might not thought about it and didn't clarify this in the specification - so if we have a hard requirement here, we might want to clarify it in the spec too. > Just to clarify, in your scenario1, for example, does ED1 contain PK=PK1? In other words, PK1 is the value of the primary key in the table, right? and by V1 you simply mean the values of the other columns? And then ED2 again contains PK=PK1? so that you can update the other columns of the same row with V2, right? Yes, undestand correctly. That's what I was trying to describe -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
wypoon commented on PR #10935: URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2294958003 @pvary thank you for your interest and for the Flink scenarios, which is very helpful as I am unfamiliar with Flink. Regarding https://github.com/apache/iceberg/pull/9888, please read my comments there. I put up https://github.com/apache/iceberg/pull/10954 only as a reference for @manuzhang so he can see the tests I added which fail with the implementation of `BaseIncrementalChangelogScan` in https://github.com/apache/iceberg/pull/9888. https://github.com/apache/iceberg/pull/10954 is not really for consideration or review. Next week I'll look into the scenarios you listed and see what gaps there are in my implementation and add tests as necessary. Regarding `DeletedRowsScanTask`, when @aokolnychyi added the API, he figured that it was necessary to know what deletes existed before the snapshot and what deletes are added in the snapshot. I'll have to think through the equality delete case, but for the positional delete case, I believe that it is not necessary to know the existing deletes (which is the reason for my `// not used` comment). For the positional delete case, I do not believe deleting the same position in the same data file again can happen, so added deletes should always be new positions. Thus, we only need to scan the data file and emit the rows that are deleted by the added deletes (which I do by using the _pos metadata column and the `PositionDeleteIndex` a `DeleteFilter` constructs for the data file). The _pos metadata column is automatically added to the schema if there are any positi on delete files to be applied to the data file, and for a `DeletedRowsScanTask`, the `DeleteFilter` is constructed with the added delete files (so those are the position delete files to be applied). I admit that I hadn't thought through equality deletes carefully. Just to clarify, in your scenario1, for example, does ED1 contain `PK=PK1`? In other words, `PK1` is the value of the primary key in the table, right? and by `V1` you simply mean the values of the other columns? And then ED2 again contains `PK=PK1`? so that you can update the other columns of the same row with `V2`, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Support changelog scan for table with delete files [iceberg]
pvary commented on PR #10935: URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2294754336 @wypoon, @manuzhang: I'm interested in providing Flink streaming CDC reads. That would require a working changelog scan planning in Iceberg core. So I would be happy to help with my reviews here. I'm not an expert of this part of the code yet, so if the PR becomes more complicated, we might have to ask help from folks more experienced here, but I can do a first round of the reviews. I see that there are multiple PRs for this topic (#9888, #10954). I would suggest to focus the discussion on one specific PR. There are some tricky scenarios with Flink CDC, we need to handle: --- Scenario1: - 1st snapshot: Writing a new record to the table primary key 1 (PK1), and value 1 (V1) - Creates data file 1 (DF1) - with PK1-V1 - 2nd snapshot: The record is updated with PK1 with value V2 - Creates an equality delete 1 (ED1) - with PK1 - Creates a new data file 2 (DF2) - with PK1-V2 - 3rd snapshot: The record is updated with PK1 with value V3 - Creates an equality delete 2 (ED2) - with PK1 - Creates a new data file 3 (DF3) - with PK1-V3 When the changelog scan reads the 3rd snapshot, it should consider: - Delete reads: - DF1 - omit records deleted by ED1, emit records deleted by ED2 - DF2 - emit records deleted by ED2 - Data read: - DF3 - emit all records Notice that for DF1 we should not emit records which are deleted by previous deletes. So I am concerned about your `// not used` comment [here](https://github.com/apache/iceberg/pull/10935/files#diff-24a8b4b8a16e20a7727ca9b04754fa00cbb40228aa79b3e63baf958aef7bec04R266) 😄 --- Scenario2: - 1st snapshot: Writing a new record to the table primary key 1 (PK1), and value 1 (V1) - Creates data file 1 (DF1) - with PK1-V1 - 2nd snapshot: The record is updated with PK1 with value V2, but later updated again with PK1 with value V3 - Creates an equality delete 1 (ED1) - with PK1 - Creates a new data file 2 (DF2) - with PK1-V2 - Creates a positional delete 1 (PS1) - for the DF2-PK1 - Here we have 2 possibilities: - Adds a new line to DF2 with PK1-V3 - if the data file target file size is not reached yet - Creates a new data file 3 (DF3) - with PK1-V3 - if the data file is already rolled over When changelog scan the 2nd snapshot, it should consider: - Delete reads: - DF1 - emit records deleted by ED1 - the emitted record is: D(PK1-V1) - DF2 - emit records deleted by PD1 - the emitted record is: D(PK1-V2) - Data read: - DF2 - omit records deleted by PS1 - the emitted record is: I(PK1-V2) - DF3 - emit all records - the emitted record is: I(PK1-V3) Notice that the order of the records is important, and not trivial to order if the files are read in the distributed way. CC: @dramaticlly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org