Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-12-17 Thread via GitHub


wypoon commented on PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2550247726

   @flyrain you had indicated that you were interested in this. Can you please 
review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-11-24 Thread via GitHub


wypoon commented on PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2496928372

   @aokolnychyi I have rebased the PR on main. How can we move forward with 
this?
   If we agree that the behavior is correct, how about we at least have 
something that works and optimize it later?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-11-17 Thread via GitHub


wypoon commented on PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2481942281

   @aokolnychyi I agree that we should stick to existing changelog tasks and 
always resolve historical deletes to produce the changelog. Have you thought of 
any optimizations for processing the snapshots? How can we move forward with 
this?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-10-22 Thread via GitHub


aokolnychyi commented on PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2430250182

   Coming back to some questions above.
   
   > Do we want to resolve equality deletes and map them into data files? Or 
should we add a new task and output the content of equality delete files?
   
   While it would be useful to stream out ingested CDC log from Flink without 
applying equality deletes, we shouldn't probably target that for now. If we 
start outputting equality deletes directly, our changelog may not be accurate. 
What if we upsert a record that didn't exist? What if an equality delete 
applied to 10 data records? We can find reasonable behavior but let's skip this 
for now. Let's always resolve and assign equality deletes so that the changelog 
is precise.
   
   > What if a snapshot adds a new delete file for the already deleted record? 
That can happen both with position and equality deletes.
   
   There is one more data point here. We are about to introduce sync 
maintenance for position deletes. This means new delete files will include old 
+ new deleted positions. Therefore, we must always resolve historic deletes.
   
   To sum up, I propose sticking to the existing changelog tasks and always 
resolving historic deletes to produce a correct changelog. I am concerned about 
full table scans for each incremental snapshot. I'll look into ideas mentioned 
above to see if we can optimize that.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-10-20 Thread via GitHub


wypoon commented on PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2425520426

   > For instance, if two position-based delete operations remove the same set 
of records, both of them will succeed.
   
   I was not aware of this. Is this specifically the case of two concurrent 
delete operations starting from the same base snapshot, both removing the same 
set of records, and one winning the commit race, but the other would still 
succeed in committing?
   Or is this more general, where a delete that deletes already deleted records 
is allowed to do so?
   
   Let's say we are in the former case, then I'd say that for the purpose of 
CDC, the second commit would contain no changes, so the changelog should not 
write anything for it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-10-17 Thread via GitHub


pvary commented on PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2420515789

   > First of all, we need to discuss the expected behavior:
   > 
   > * Do we want to resolve equality deletes and map them into data files? Or 
should we add a new task and output the content of equality delete files? I'd 
say we should support both options (without resolving by default?).
   
   If the table is written by Flink, then for Flink CDC streaming read the lazy 
solution (not resolving the equality deletes) would be enough. If there is 
another writer for the table which creates non Flink conform equality delete 
files, or the user wants a retracting CDC stream when the table was written by 
an upsert CDC stream, then the equality delete resolution is still needed.
   
   > * What if a snapshot adds a new delete file for the already deleted 
record? That can happen both with position and equality deletes. For instance, 
if two position-based delete operations remove the same set of records, both of 
them will succeed. Producing a precise CDC log would require reading all 
historic deletes, which may be unnecessary expensive in some cases. I'd say 
this should be configurable as well.
   
   I think that applying delete files is less costly, so I would stick to the 
theoretically correct solution, and apply previously added delete files to 
produce the precise CDC log
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-10-16 Thread via GitHub


aokolnychyi commented on PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2418069515

   What I wrote above is more complicated, we need to understand if that 
complexity will be justified. I am not sure yet.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-10-16 Thread via GitHub


aokolnychyi commented on PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2418068303

   I went through some of my old notes, which we should discuss.
   
   We have the following tasks right now:
   - `AddedRowsScanTask` (added data file + deletes that happened within the 
same snapshot).
   - `DeletedDataFileScanTask` (removed data file + deletes that applied to it 
before it was removed).
   - `DeletedRowsScanTask` (data file that was affected by a delete file (if we 
resolve equality deletes or we had position deletes) + historic deletes that 
were there before + new deletes added in this snapshot).
   
   First of all, we need to discuss the expected behavior:
   - Do we want to resolve equality deletes and map them into data files? Or 
should we add a new task and output the content of equality delete files? I'd 
say we should support both options (without resolving by default?).
   - What if a snapshot adds a new delete file for the already deleted record? 
That can happen both with position and equality deletes. For instance, if two 
position-based delete operations remove the same set of records, both of them 
will succeed. Producing a precise CDC log would require reading all historic 
deletes, which may be unnecessary expensive in some cases. I'd say this should 
be configurable as well.
   
   If we want to make resolving equality deletes optional, we need 
`AddedEqualityDeletesScanTask`. We discussed this for Flink CDC use cases. It 
is going to be costly at planning and query time to apply equality deletes to 
data files to get removed records. As long as the equality delete persists the 
entire row or the caller is OK with only equality columns, it should be fine to 
output the content of equality delete files as is.
   
   I agree with the idea of iterating snapshot by snapshot. I wonder if we can 
optimize it, though.
   
   **AddedRowsScanTask**
   
   We don’t have to look up historic deletes as the file was just added, except 
the position deletes added in the same snapshot. For each added data file, it 
is enough to look up matching position deletes in `DeleteFileIndex` built from 
delete manifests added in this snapshot.
   
   **DeletedDataFileScanTask**
   
   We have to build `DeleteFileIndex` that includes all historic deletes for 
removed data files. We can optimize this step by reading delete manifests only 
for the affected partitions. We can create a `PartitionMap` predicate from new 
data files and use it while reading delete manifests. We can supplement that 
with a predicate on `referencedDataFile` in the future. Historic delete files 
that are not affecting deleted data file partitions can be discarded.
   
   **DeletedRowsScanTask**
   
   For each position delete file and each equality delete file (if those must 
be resolved), find matching data files and historic deletes (if configured to 
resolve double deletes). We can still build a partition predicate from all 
delete files that were added in this snapshot. We can use that predicate to 
prune the set of data manifests for that snapshot. For each newly added delete 
file, output a task for each affected data file (one delete file can match 
multiple data files).
   
   **AddedEqualityDeletesScanTask**
   
   Output each added equality delete file as a separate task without matching 
them to data files (if configured).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-10-16 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1803801850


##
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##
@@ -63,33 +61,39 @@ protected CloseableIterable doPlanFiles(
   return CloseableIterable.empty();
 }
 
-Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+Map snapshotOrdinals = 
computeSnapshotOrdinals(changelogSnapshots);
 
-Set newDataManifests =
+Iterable> plans =
 FluentIterable.from(changelogSnapshots)
-.transformAndConcat(snapshot -> 
snapshot.dataManifests(table().io()))
-.filter(manifest -> 
changelogSnapshotIds.contains(manifest.snapshotId()))
-.toSet();
-
-ManifestGroup manifestGroup =
-new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
-.specsById(table().specs())
-.caseSensitive(isCaseSensitive())
-.select(scanColumns())
-.filterData(filter())
-.filterManifestEntries(entry -> 
changelogSnapshotIds.contains(entry.snapshotId()))
-.ignoreExisting()
-.columnsToKeepStats(columnsToKeepStats());
-
-if (shouldIgnoreResiduals()) {
-  manifestGroup = manifestGroup.ignoreResiduals();
-}
-
-if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
-  manifestGroup = manifestGroup.planWith(planExecutor());
-}
-
-return manifestGroup.plan(new 
CreateDataFileChangeTasks(changelogSnapshots));
+.transform(
+snapshot -> {
+  List dataManifests = 
snapshot.dataManifests(table().io());
+  List deleteManifests = 
snapshot.deleteManifests(table().io());
+
+  ManifestGroup manifestGroup =
+  new ManifestGroup(table().io(), dataManifests, 
deleteManifests)

Review Comment:
   I went snapshot by snapshot as it was the easiest to reason with. For sure 
we need to consider more than just newly added manifests. For each snapshot, we 
need to consider existing data files if there are new deletes applicable to 
them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-10-16 Thread via GitHub


aokolnychyi commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1803692669


##
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##
@@ -63,33 +61,39 @@ protected CloseableIterable doPlanFiles(
   return CloseableIterable.empty();
 }
 
-Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+Map snapshotOrdinals = 
computeSnapshotOrdinals(changelogSnapshots);
 
-Set newDataManifests =
+Iterable> plans =
 FluentIterable.from(changelogSnapshots)
-.transformAndConcat(snapshot -> 
snapshot.dataManifests(table().io()))
-.filter(manifest -> 
changelogSnapshotIds.contains(manifest.snapshotId()))
-.toSet();
-
-ManifestGroup manifestGroup =
-new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
-.specsById(table().specs())
-.caseSensitive(isCaseSensitive())
-.select(scanColumns())
-.filterData(filter())
-.filterManifestEntries(entry -> 
changelogSnapshotIds.contains(entry.snapshotId()))
-.ignoreExisting()
-.columnsToKeepStats(columnsToKeepStats());
-
-if (shouldIgnoreResiduals()) {
-  manifestGroup = manifestGroup.ignoreResiduals();
-}
-
-if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
-  manifestGroup = manifestGroup.planWith(planExecutor());
-}
-
-return manifestGroup.plan(new 
CreateDataFileChangeTasks(changelogSnapshots));
+.transform(
+snapshot -> {
+  List dataManifests = 
snapshot.dataManifests(table().io());
+  List deleteManifests = 
snapshot.deleteManifests(table().io());
+
+  ManifestGroup manifestGroup =
+  new ManifestGroup(table().io(), dataManifests, 
deleteManifests)

Review Comment:
   This will cause a substantial performance hit as we will scan all data and 
delete manifests that match the filter for each changelog snapshot, instead of 
opening only newly added manifests before. We may have to do that for deletes 
anyway, but I wonder about data manifests. Let me think a bit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-10-16 Thread via GitHub


wypoon commented on PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2417568428

   @flyrain and @aokolnychyi as you have both expressed interest to review 
this, I hope you can find time to do so soon! Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-10-11 Thread via GitHub


aokolnychyi commented on PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2407845436

   Will start looking into this PR today and should be able to finish over the 
weekend.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-25 Thread via GitHub


wypoon commented on PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2374326949

   @flyrain you worked on this area and implemented some of the changelog 
support; can you please review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-25 Thread via GitHub


pvary commented on PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2373258387

   @RussellSpitzer, @aokolnychyi: would it be possible to take a look at this 
PR? I did my best to review it, but I'm not an expert in this part of the code.
   Thanks, Peter 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-24 Thread via GitHub


wypoon commented on PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2372741865

   @pvary can you please help move this forward then?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-23 Thread via GitHub


pvary commented on PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2367649336

   This looks good to me.
   @dramaticlly: Any more comments, before we try to involve the guys who are 
more experienced with the core/spark parts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-23 Thread via GitHub


pvary commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1771032932


##
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##
@@ -132,6 +131,175 @@ public void testFileDeletes() {
 assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty();
   }
 
+  @TestTemplate
+  public void testRowDeletes() {
+assumeThat(formatVersion).isEqualTo(2);
+
+table
+.newFastAppend()
+.appendFile(FILE_A)
+.appendFile(FILE_A2)
+.appendFile(FILE_B)
+.appendFile(FILE_C)
+.commit();
+Snapshot snap1 = table.currentSnapshot();
+
+// position delete
+table.newRowDelta().addDeletes(FILE_B_DELETES).commit();
+Snapshot snap2 = table.currentSnapshot();
+
+// equality delete
+table.newRowDelta().addDeletes(FILE_C2_DELETES).commit();
+Snapshot snap3 = table.currentSnapshot();
+
+// mix of position and equality deletes
+
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();

Review Comment:
   Cool!
   Thanks for the explanation!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-23 Thread via GitHub


pvary commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1771031788


##
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##
@@ -132,6 +131,139 @@ public void testFileDeletes() {
 assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty();
   }
 
+  @TestTemplate
+  public void testPositionDeletes() {

Review Comment:
   Thanks!
   I like your new tests!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-19 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1767624705


##
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##
@@ -132,6 +131,175 @@ public void testFileDeletes() {
 assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty();
   }
 
+  @TestTemplate
+  public void testRowDeletes() {
+assumeThat(formatVersion).isEqualTo(2);
+
+table
+.newFastAppend()
+.appendFile(FILE_A)
+.appendFile(FILE_A2)
+.appendFile(FILE_B)
+.appendFile(FILE_C)
+.commit();
+Snapshot snap1 = table.currentSnapshot();
+
+// position delete
+table.newRowDelta().addDeletes(FILE_B_DELETES).commit();
+Snapshot snap2 = table.currentSnapshot();
+
+// equality delete
+table.newRowDelta().addDeletes(FILE_C2_DELETES).commit();
+Snapshot snap3 = table.currentSnapshot();
+
+// mix of position and equality deletes
+
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();

Review Comment:
   Ah, you mean compaction as in rewriting data files.
   Compaction does not result in any inserts or deletes. (I believe this is 
what we both expect.) Please see `testDataRewritesAreIgnored` that I added in 
`TestChangelogTable` in this PR. This test is in spark-extensions because I 
want to call the rewrite_data_files Spark procedure.
   
   (Note that in that test, there is a snapshot between `snap2` and `snap3`, 
namely the snapshot where the rewrite_data_files is committed, but I don't 
capture it, because no change for it shows up in the changelog (which is the 
point). In `snap3`, another delete occurs, and that shows up in the changelog.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-19 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1767624705


##
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##
@@ -132,6 +131,175 @@ public void testFileDeletes() {
 assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty();
   }
 
+  @TestTemplate
+  public void testRowDeletes() {
+assumeThat(formatVersion).isEqualTo(2);
+
+table
+.newFastAppend()
+.appendFile(FILE_A)
+.appendFile(FILE_A2)
+.appendFile(FILE_B)
+.appendFile(FILE_C)
+.commit();
+Snapshot snap1 = table.currentSnapshot();
+
+// position delete
+table.newRowDelta().addDeletes(FILE_B_DELETES).commit();
+Snapshot snap2 = table.currentSnapshot();
+
+// equality delete
+table.newRowDelta().addDeletes(FILE_C2_DELETES).commit();
+Snapshot snap3 = table.currentSnapshot();
+
+// mix of position and equality deletes
+
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();

Review Comment:
   Ah, you mean compaction as in rewriting data files.
   Compaction does not result in any inserts or deletes as far as the changelog 
is concerned. (I believe this is what we both expect.) Please see 
`testDataRewritesAreIgnored` that I added in `TestChangelogTable` in this PR. 
This test is in spark-extensions because I want to call the rewrite_data_files 
Spark procedure.
   
   (Note that in that test, there is a snapshot between `snap2` and `snap3`, 
namely the snapshot where the rewrite_data_files is committed, but I don't 
capture it, because no change for it shows up in the changelog (which is the 
point). In `snap3`, another delete occurs, and that shows up in the changelog.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-19 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1767624705


##
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##
@@ -132,6 +131,175 @@ public void testFileDeletes() {
 assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty();
   }
 
+  @TestTemplate
+  public void testRowDeletes() {
+assumeThat(formatVersion).isEqualTo(2);
+
+table
+.newFastAppend()
+.appendFile(FILE_A)
+.appendFile(FILE_A2)
+.appendFile(FILE_B)
+.appendFile(FILE_C)
+.commit();
+Snapshot snap1 = table.currentSnapshot();
+
+// position delete
+table.newRowDelta().addDeletes(FILE_B_DELETES).commit();
+Snapshot snap2 = table.currentSnapshot();
+
+// equality delete
+table.newRowDelta().addDeletes(FILE_C2_DELETES).commit();
+Snapshot snap3 = table.currentSnapshot();
+
+// mix of position and equality deletes
+
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();

Review Comment:
   Ah, you mean compaction as in rewriting data files.
   In this case, compaction does not result in any inserts or deletes. (I 
believe this is what we both expect.) Please see `testDataRewritesAreIgnored` 
that I added in `TestChangelogTable` in this PR. This test is in 
spark-extensions because I want to call the rewrite_data_files Spark procedure.
   
   (Note that in that test, there is a snapshot between `snap2` and `snap3`, 
namely the snapshot where the rewrite_data_files is committed, but I don't 
capture it, because no change for it shows up in the changelog (which is the 
point). In `snap3`, another delete occurs, and that shows up in the changelog.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-19 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1767624705


##
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##
@@ -132,6 +131,175 @@ public void testFileDeletes() {
 assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty();
   }
 
+  @TestTemplate
+  public void testRowDeletes() {
+assumeThat(formatVersion).isEqualTo(2);
+
+table
+.newFastAppend()
+.appendFile(FILE_A)
+.appendFile(FILE_A2)
+.appendFile(FILE_B)
+.appendFile(FILE_C)
+.commit();
+Snapshot snap1 = table.currentSnapshot();
+
+// position delete
+table.newRowDelta().addDeletes(FILE_B_DELETES).commit();
+Snapshot snap2 = table.currentSnapshot();
+
+// equality delete
+table.newRowDelta().addDeletes(FILE_C2_DELETES).commit();
+Snapshot snap3 = table.currentSnapshot();
+
+// mix of position and equality deletes
+
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();

Review Comment:
   Ah, you mean compaction as in rewriting data files.
   In this case, compaction does not result in any inserts or deletes. (I 
believe this is what we both expect.) Please see `testDataRewritesAreIgnored` 
that I added in `TestChangelogTable` in this PR. This test is in 
spark-extensions because I want to call the rewrite_data_files Spark procedure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-19 Thread via GitHub


pvary commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1767544018


##
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##
@@ -132,6 +131,175 @@ public void testFileDeletes() {
 assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty();
   }
 
+  @TestTemplate
+  public void testRowDeletes() {
+assumeThat(formatVersion).isEqualTo(2);
+
+table
+.newFastAppend()
+.appendFile(FILE_A)
+.appendFile(FILE_A2)
+.appendFile(FILE_B)
+.appendFile(FILE_C)
+.commit();
+Snapshot snap1 = table.currentSnapshot();
+
+// position delete
+table.newRowDelta().addDeletes(FILE_B_DELETES).commit();
+Snapshot snap2 = table.currentSnapshot();
+
+// equality delete
+table.newRowDelta().addDeletes(FILE_C2_DELETES).commit();
+Snapshot snap3 = table.currentSnapshot();
+
+// mix of position and equality deletes
+
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();

Review Comment:
   This is the question about how you see the compaction where we rewrite the 
files with the expressed goal to keep the actual content unchanged.
   - Is this a change where we remove and add files which are coincidentally 
contain the same data?
   - Is this a change where no data has changed?
   
   I see the reasoning behind both cases, but using the first approach would 
most probably make the changelog scan unfesable for cases where compaction 
happens on the table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-19 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1767321018


##
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##
@@ -132,6 +131,175 @@ public void testFileDeletes() {
 assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty();
   }
 
+  @TestTemplate
+  public void testRowDeletes() {
+assumeThat(formatVersion).isEqualTo(2);
+
+table
+.newFastAppend()
+.appendFile(FILE_A)
+.appendFile(FILE_A2)
+.appendFile(FILE_B)
+.appendFile(FILE_C)
+.commit();
+Snapshot snap1 = table.currentSnapshot();
+
+// position delete
+table.newRowDelta().addDeletes(FILE_B_DELETES).commit();
+Snapshot snap2 = table.currentSnapshot();
+
+// equality delete
+table.newRowDelta().addDeletes(FILE_C2_DELETES).commit();
+Snapshot snap3 = table.currentSnapshot();
+
+// mix of position and equality deletes
+
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();

Review Comment:
   ps. Net changes is something a user might want regardless of whether 
merge-on-read or copy-on-write is used, so is an orthogonal concern.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-19 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1767319143


##
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##
@@ -132,6 +131,175 @@ public void testFileDeletes() {
 assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty();
   }
 
+  @TestTemplate
+  public void testRowDeletes() {
+assumeThat(formatVersion).isEqualTo(2);
+
+table
+.newFastAppend()
+.appendFile(FILE_A)
+.appendFile(FILE_A2)
+.appendFile(FILE_B)
+.appendFile(FILE_C)
+.commit();
+Snapshot snap1 = table.currentSnapshot();
+
+// position delete
+table.newRowDelta().addDeletes(FILE_B_DELETES).commit();
+Snapshot snap2 = table.currentSnapshot();
+
+// equality delete
+table.newRowDelta().addDeletes(FILE_C2_DELETES).commit();
+Snapshot snap3 = table.currentSnapshot();
+
+// mix of position and equality deletes
+
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();

Review Comment:
   I'm not sure I understand your question. Are you referring to outputting 
*net changes*? If so, I think that should be a separate PR. I think for that, 
we should introduce an option for it.
   If you're referring to something else, can you please explain?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-19 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1767295969


##
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##
@@ -132,6 +131,175 @@ public void testFileDeletes() {
 assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty();
   }
 
+  @TestTemplate
+  public void testRowDeletes() {
+assumeThat(formatVersion).isEqualTo(2);
+
+table
+.newFastAppend()
+.appendFile(FILE_A)
+.appendFile(FILE_A2)
+.appendFile(FILE_B)
+.appendFile(FILE_C)
+.commit();
+Snapshot snap1 = table.currentSnapshot();
+
+// position delete
+table.newRowDelta().addDeletes(FILE_B_DELETES).commit();
+Snapshot snap2 = table.currentSnapshot();
+
+// equality delete
+table.newRowDelta().addDeletes(FILE_C2_DELETES).commit();
+Snapshot snap3 = table.currentSnapshot();
+
+// mix of position and equality deletes
+
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();
+Snapshot snap4 = table.currentSnapshot();
+
+IncrementalChangelogScan scan =
+
newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap4.snapshotId());
+
+List tasks = plan(scan);
+
+assertThat(tasks).as("Must have 4 tasks").hasSize(4);
+
+DeletedRowsScanTask t1 = (DeletedRowsScanTask) tasks.get(0);
+assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0);
+assertThat(t1.commitSnapshotId()).as("Snapshot must 
match").isEqualTo(snap2.snapshotId());
+assertThat(t1.file().path()).as("Data file must 
match").isEqualTo(FILE_B.path());
+assertThat(t1.addedDeletes().get(0).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_B_DELETES.path());
+assertThat(t1.existingDeletes()).as("Must be no existing 
deletes").isEmpty();
+
+DeletedRowsScanTask t2 = (DeletedRowsScanTask) tasks.get(1);
+assertThat(t2.changeOrdinal()).as("Ordinal must match").isEqualTo(1);
+assertThat(t2.commitSnapshotId()).as("Snapshot must 
match").isEqualTo(snap3.snapshotId());
+assertThat(t2.file().path()).as("Data file must 
match").isEqualTo(FILE_C.path());
+assertThat(t2.addedDeletes().get(0).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_C2_DELETES.path());
+assertThat(t2.existingDeletes()).as("Must be no existing 
deletes").isEmpty();
+
+DeletedRowsScanTask t3 = (DeletedRowsScanTask) tasks.get(2);
+assertThat(t3.changeOrdinal()).as("Ordinal must match").isEqualTo(2);
+assertThat(t3.commitSnapshotId()).as("Snapshot must 
match").isEqualTo(snap4.snapshotId());
+assertThat(t3.file().path()).as("Data file must 
match").isEqualTo(FILE_A2.path());
+assertThat(t3.addedDeletes().size()).as("Number of added delete files must 
match").isEqualTo(2);
+assertThat(t3.addedDeletes().get(0).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_A2_DELETES.path());
+assertThat(t3.addedDeletes().get(1).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_A_DELETES.path());

Review Comment:
   The test is not flaky, but you have a good point. We need not depend on the 
order. We can follow a variant of @dramaticlly's suggestion below:
   ```
   assertThat(t3.addedDeletes())
   .hasSize(2)
   .extracting(DeleteFile::path)
   .as("Added delete file must match")
   .containsExactlyInAnyOrder(
   FILE_A2_DELETES.path(),
   FILE_A_DELETES.path());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-19 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1767285190


##
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##
@@ -132,6 +131,175 @@ public void testFileDeletes() {
 assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty();
   }
 
+  @TestTemplate
+  public void testRowDeletes() {
+assumeThat(formatVersion).isEqualTo(2);
+
+table
+.newFastAppend()
+.appendFile(FILE_A)
+.appendFile(FILE_A2)
+.appendFile(FILE_B)
+.appendFile(FILE_C)
+.commit();
+Snapshot snap1 = table.currentSnapshot();
+
+// position delete
+table.newRowDelta().addDeletes(FILE_B_DELETES).commit();
+Snapshot snap2 = table.currentSnapshot();
+
+// equality delete
+table.newRowDelta().addDeletes(FILE_C2_DELETES).commit();
+Snapshot snap3 = table.currentSnapshot();
+
+// mix of position and equality deletes
+
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();
+Snapshot snap4 = table.currentSnapshot();
+
+IncrementalChangelogScan scan =
+
newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap4.snapshotId());
+
+List tasks = plan(scan);
+
+assertThat(tasks).as("Must have 4 tasks").hasSize(4);
+
+DeletedRowsScanTask t1 = (DeletedRowsScanTask) tasks.get(0);
+assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0);
+assertThat(t1.commitSnapshotId()).as("Snapshot must 
match").isEqualTo(snap2.snapshotId());
+assertThat(t1.file().path()).as("Data file must 
match").isEqualTo(FILE_B.path());
+assertThat(t1.addedDeletes().get(0).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_B_DELETES.path());
+assertThat(t1.existingDeletes()).as("Must be no existing 
deletes").isEmpty();
+
+DeletedRowsScanTask t2 = (DeletedRowsScanTask) tasks.get(1);
+assertThat(t2.changeOrdinal()).as("Ordinal must match").isEqualTo(1);
+assertThat(t2.commitSnapshotId()).as("Snapshot must 
match").isEqualTo(snap3.snapshotId());
+assertThat(t2.file().path()).as("Data file must 
match").isEqualTo(FILE_C.path());
+assertThat(t2.addedDeletes().get(0).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_C2_DELETES.path());
+assertThat(t2.existingDeletes()).as("Must be no existing 
deletes").isEmpty();
+
+DeletedRowsScanTask t3 = (DeletedRowsScanTask) tasks.get(2);
+assertThat(t3.changeOrdinal()).as("Ordinal must match").isEqualTo(2);
+assertThat(t3.commitSnapshotId()).as("Snapshot must 
match").isEqualTo(snap4.snapshotId());
+assertThat(t3.file().path()).as("Data file must 
match").isEqualTo(FILE_A2.path());
+assertThat(t3.addedDeletes().size()).as("Number of added delete files must 
match").isEqualTo(2);
+assertThat(t3.addedDeletes().get(0).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_A2_DELETES.path());
+assertThat(t3.addedDeletes().get(1).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_A_DELETES.path());
+assertThat(t3.existingDeletes()).as("Must be no existing 
deletes").isEmpty();
+
+DeletedRowsScanTask t4 = (DeletedRowsScanTask) tasks.get(3);
+assertThat(t4.changeOrdinal()).as("Ordinal must match").isEqualTo(2);
+assertThat(t4.commitSnapshotId()).as("Snapshot must 
match").isEqualTo(snap4.snapshotId());
+assertThat(t4.file().path()).as("Data file must 
match").isEqualTo(FILE_A.path());
+assertThat(t4.addedDeletes().size()).as("Number of added delete files must 
match").isEqualTo(2);
+assertThat(t4.addedDeletes().get(0).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_A2_DELETES.path());
+assertThat(t4.addedDeletes().get(1).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_A_DELETES.path());
+assertThat(t4.existingDeletes()).as("Must be no existing 
deletes").isEmpty();

Review Comment:
   That sounds good. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-19 Thread via GitHub


dramaticlly commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1767264496


##
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##
@@ -132,6 +131,175 @@ public void testFileDeletes() {
 assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty();
   }
 
+  @TestTemplate
+  public void testRowDeletes() {
+assumeThat(formatVersion).isEqualTo(2);
+
+table
+.newFastAppend()
+.appendFile(FILE_A)
+.appendFile(FILE_A2)
+.appendFile(FILE_B)
+.appendFile(FILE_C)
+.commit();
+Snapshot snap1 = table.currentSnapshot();
+
+// position delete
+table.newRowDelta().addDeletes(FILE_B_DELETES).commit();
+Snapshot snap2 = table.currentSnapshot();
+
+// equality delete
+table.newRowDelta().addDeletes(FILE_C2_DELETES).commit();
+Snapshot snap3 = table.currentSnapshot();
+
+// mix of position and equality deletes
+
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();
+Snapshot snap4 = table.currentSnapshot();
+
+IncrementalChangelogScan scan =
+
newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap4.snapshotId());
+
+List tasks = plan(scan);
+
+assertThat(tasks).as("Must have 4 tasks").hasSize(4);
+
+DeletedRowsScanTask t1 = (DeletedRowsScanTask) tasks.get(0);
+assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0);
+assertThat(t1.commitSnapshotId()).as("Snapshot must 
match").isEqualTo(snap2.snapshotId());
+assertThat(t1.file().path()).as("Data file must 
match").isEqualTo(FILE_B.path());
+assertThat(t1.addedDeletes().get(0).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_B_DELETES.path());
+assertThat(t1.existingDeletes()).as("Must be no existing 
deletes").isEmpty();
+
+DeletedRowsScanTask t2 = (DeletedRowsScanTask) tasks.get(1);
+assertThat(t2.changeOrdinal()).as("Ordinal must match").isEqualTo(1);
+assertThat(t2.commitSnapshotId()).as("Snapshot must 
match").isEqualTo(snap3.snapshotId());
+assertThat(t2.file().path()).as("Data file must 
match").isEqualTo(FILE_C.path());
+assertThat(t2.addedDeletes().get(0).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_C2_DELETES.path());
+assertThat(t2.existingDeletes()).as("Must be no existing 
deletes").isEmpty();
+
+DeletedRowsScanTask t3 = (DeletedRowsScanTask) tasks.get(2);
+assertThat(t3.changeOrdinal()).as("Ordinal must match").isEqualTo(2);
+assertThat(t3.commitSnapshotId()).as("Snapshot must 
match").isEqualTo(snap4.snapshotId());
+assertThat(t3.file().path()).as("Data file must 
match").isEqualTo(FILE_A2.path());
+assertThat(t3.addedDeletes().size()).as("Number of added delete files must 
match").isEqualTo(2);
+assertThat(t3.addedDeletes().get(0).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_A2_DELETES.path());
+assertThat(t3.addedDeletes().get(1).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_A_DELETES.path());

Review Comment:
   if I understand this correctly, the test result seem to be consistent here 
as when build DeleteFileIndex, we always concat equality delete (A2_DELETES) 
before position delete (A_DELETES) for given data file entry in 
https://github.com/apache/iceberg/blob/9dbfbbb203c007592b8ba5f4816925043c08923a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java#L139
 and array ordering seem to be maintained through out the code. 
   
   
   IMO, this is not necessarily something we need to assert the ordering on
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-19 Thread via GitHub


dramaticlly commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1767226037


##
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##
@@ -132,6 +131,175 @@ public void testFileDeletes() {
 assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty();
   }
 
+  @TestTemplate
+  public void testRowDeletes() {
+assumeThat(formatVersion).isEqualTo(2);
+
+table
+.newFastAppend()
+.appendFile(FILE_A)
+.appendFile(FILE_A2)
+.appendFile(FILE_B)
+.appendFile(FILE_C)
+.commit();
+Snapshot snap1 = table.currentSnapshot();
+
+// position delete
+table.newRowDelta().addDeletes(FILE_B_DELETES).commit();
+Snapshot snap2 = table.currentSnapshot();
+
+// equality delete
+table.newRowDelta().addDeletes(FILE_C2_DELETES).commit();
+Snapshot snap3 = table.currentSnapshot();
+
+// mix of position and equality deletes
+
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();
+Snapshot snap4 = table.currentSnapshot();
+
+IncrementalChangelogScan scan =
+
newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap4.snapshotId());
+
+List tasks = plan(scan);
+
+assertThat(tasks).as("Must have 4 tasks").hasSize(4);
+
+DeletedRowsScanTask t1 = (DeletedRowsScanTask) tasks.get(0);
+assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0);
+assertThat(t1.commitSnapshotId()).as("Snapshot must 
match").isEqualTo(snap2.snapshotId());
+assertThat(t1.file().path()).as("Data file must 
match").isEqualTo(FILE_B.path());
+assertThat(t1.addedDeletes().get(0).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_B_DELETES.path());
+assertThat(t1.existingDeletes()).as("Must be no existing 
deletes").isEmpty();
+
+DeletedRowsScanTask t2 = (DeletedRowsScanTask) tasks.get(1);
+assertThat(t2.changeOrdinal()).as("Ordinal must match").isEqualTo(1);
+assertThat(t2.commitSnapshotId()).as("Snapshot must 
match").isEqualTo(snap3.snapshotId());
+assertThat(t2.file().path()).as("Data file must 
match").isEqualTo(FILE_C.path());
+assertThat(t2.addedDeletes().get(0).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_C2_DELETES.path());
+assertThat(t2.existingDeletes()).as("Must be no existing 
deletes").isEmpty();
+
+DeletedRowsScanTask t3 = (DeletedRowsScanTask) tasks.get(2);
+assertThat(t3.changeOrdinal()).as("Ordinal must match").isEqualTo(2);
+assertThat(t3.commitSnapshotId()).as("Snapshot must 
match").isEqualTo(snap4.snapshotId());
+assertThat(t3.file().path()).as("Data file must 
match").isEqualTo(FILE_A2.path());
+assertThat(t3.addedDeletes().size()).as("Number of added delete files must 
match").isEqualTo(2);
+assertThat(t3.addedDeletes().get(0).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_A2_DELETES.path());
+assertThat(t3.addedDeletes().get(1).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_A_DELETES.path());
+assertThat(t3.existingDeletes()).as("Must be no existing 
deletes").isEmpty();
+
+DeletedRowsScanTask t4 = (DeletedRowsScanTask) tasks.get(3);
+assertThat(t4.changeOrdinal()).as("Ordinal must match").isEqualTo(2);
+assertThat(t4.commitSnapshotId()).as("Snapshot must 
match").isEqualTo(snap4.snapshotId());
+assertThat(t4.file().path()).as("Data file must 
match").isEqualTo(FILE_A.path());
+assertThat(t4.addedDeletes().size()).as("Number of added delete files must 
match").isEqualTo(2);
+assertThat(t4.addedDeletes().get(0).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_A2_DELETES.path());
+assertThat(t4.addedDeletes().get(1).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_A_DELETES.path());
+assertThat(t4.existingDeletes()).as("Must be no existing 
deletes").isEmpty();

Review Comment:
   In that sense I believe we can probably simplify assertion on addedDeletes 
to be this (with code reduction)
   
   ```java
   assertThat(t3.addedDeletes())
   .hasSize(2)
   .extracting(DeleteFile::path)
   .as("Added delete file must match")
   .containsExactly(
   FILE_A2_DELETES.path(),
   FILE_A_DELETES.path());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-19 Thread via GitHub


pvary commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1766615044


##
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##
@@ -132,6 +131,175 @@ public void testFileDeletes() {
 assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty();
   }
 
+  @TestTemplate
+  public void testRowDeletes() {
+assumeThat(formatVersion).isEqualTo(2);
+
+table
+.newFastAppend()
+.appendFile(FILE_A)
+.appendFile(FILE_A2)
+.appendFile(FILE_B)
+.appendFile(FILE_C)
+.commit();
+Snapshot snap1 = table.currentSnapshot();
+
+// position delete
+table.newRowDelta().addDeletes(FILE_B_DELETES).commit();
+Snapshot snap2 = table.currentSnapshot();
+
+// equality delete
+table.newRowDelta().addDeletes(FILE_C2_DELETES).commit();
+Snapshot snap3 = table.currentSnapshot();
+
+// mix of position and equality deletes
+
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();

Review Comment:
   One more interesting question is:
   - How we will handle the compaction commits? Is there a better way than emit 
every insert/delete record? Could we skip it?
   
   Is it trivial to fix in this PR, or is that need another PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-19 Thread via GitHub


pvary commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1766608610


##
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##
@@ -132,6 +131,175 @@ public void testFileDeletes() {
 assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty();
   }
 
+  @TestTemplate
+  public void testRowDeletes() {
+assumeThat(formatVersion).isEqualTo(2);
+
+table
+.newFastAppend()
+.appendFile(FILE_A)
+.appendFile(FILE_A2)
+.appendFile(FILE_B)
+.appendFile(FILE_C)
+.commit();
+Snapshot snap1 = table.currentSnapshot();
+
+// position delete
+table.newRowDelta().addDeletes(FILE_B_DELETES).commit();
+Snapshot snap2 = table.currentSnapshot();
+
+// equality delete
+table.newRowDelta().addDeletes(FILE_C2_DELETES).commit();
+Snapshot snap3 = table.currentSnapshot();
+
+// mix of position and equality deletes
+
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();
+Snapshot snap4 = table.currentSnapshot();
+
+IncrementalChangelogScan scan =
+
newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap4.snapshotId());
+
+List tasks = plan(scan);
+
+assertThat(tasks).as("Must have 4 tasks").hasSize(4);
+
+DeletedRowsScanTask t1 = (DeletedRowsScanTask) tasks.get(0);
+assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0);
+assertThat(t1.commitSnapshotId()).as("Snapshot must 
match").isEqualTo(snap2.snapshotId());
+assertThat(t1.file().path()).as("Data file must 
match").isEqualTo(FILE_B.path());
+assertThat(t1.addedDeletes().get(0).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_B_DELETES.path());
+assertThat(t1.existingDeletes()).as("Must be no existing 
deletes").isEmpty();
+
+DeletedRowsScanTask t2 = (DeletedRowsScanTask) tasks.get(1);
+assertThat(t2.changeOrdinal()).as("Ordinal must match").isEqualTo(1);
+assertThat(t2.commitSnapshotId()).as("Snapshot must 
match").isEqualTo(snap3.snapshotId());
+assertThat(t2.file().path()).as("Data file must 
match").isEqualTo(FILE_C.path());
+assertThat(t2.addedDeletes().get(0).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_C2_DELETES.path());
+assertThat(t2.existingDeletes()).as("Must be no existing 
deletes").isEmpty();
+
+DeletedRowsScanTask t3 = (DeletedRowsScanTask) tasks.get(2);
+assertThat(t3.changeOrdinal()).as("Ordinal must match").isEqualTo(2);
+assertThat(t3.commitSnapshotId()).as("Snapshot must 
match").isEqualTo(snap4.snapshotId());
+assertThat(t3.file().path()).as("Data file must 
match").isEqualTo(FILE_A2.path());
+assertThat(t3.addedDeletes().size()).as("Number of added delete files must 
match").isEqualTo(2);
+assertThat(t3.addedDeletes().get(0).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_A2_DELETES.path());
+assertThat(t3.addedDeletes().get(1).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_A_DELETES.path());

Review Comment:
   would this be flaky? Is there a guarantee for the order of the files?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-18 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1765955792


##
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##
@@ -132,6 +131,175 @@ public void testFileDeletes() {
 assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty();
   }
 
+  @TestTemplate
+  public void testRowDeletes() {
+assumeThat(formatVersion).isEqualTo(2);
+
+table
+.newFastAppend()
+.appendFile(FILE_A)
+.appendFile(FILE_A2)
+.appendFile(FILE_B)
+.appendFile(FILE_C)
+.commit();
+Snapshot snap1 = table.currentSnapshot();
+
+// position delete
+table.newRowDelta().addDeletes(FILE_B_DELETES).commit();
+Snapshot snap2 = table.currentSnapshot();
+
+// equality delete
+table.newRowDelta().addDeletes(FILE_C2_DELETES).commit();
+Snapshot snap3 = table.currentSnapshot();
+
+// mix of position and equality deletes
+
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();
+Snapshot snap4 = table.currentSnapshot();
+
+IncrementalChangelogScan scan =
+
newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap4.snapshotId());
+
+List tasks = plan(scan);
+
+assertThat(tasks).as("Must have 4 tasks").hasSize(4);
+
+DeletedRowsScanTask t1 = (DeletedRowsScanTask) tasks.get(0);
+assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0);
+assertThat(t1.commitSnapshotId()).as("Snapshot must 
match").isEqualTo(snap2.snapshotId());
+assertThat(t1.file().path()).as("Data file must 
match").isEqualTo(FILE_B.path());
+assertThat(t1.addedDeletes().get(0).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_B_DELETES.path());
+assertThat(t1.existingDeletes()).as("Must be no existing 
deletes").isEmpty();
+
+DeletedRowsScanTask t2 = (DeletedRowsScanTask) tasks.get(1);
+assertThat(t2.changeOrdinal()).as("Ordinal must match").isEqualTo(1);
+assertThat(t2.commitSnapshotId()).as("Snapshot must 
match").isEqualTo(snap3.snapshotId());
+assertThat(t2.file().path()).as("Data file must 
match").isEqualTo(FILE_C.path());
+assertThat(t2.addedDeletes().get(0).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_C2_DELETES.path());
+assertThat(t2.existingDeletes()).as("Must be no existing 
deletes").isEmpty();
+
+DeletedRowsScanTask t3 = (DeletedRowsScanTask) tasks.get(2);
+assertThat(t3.changeOrdinal()).as("Ordinal must match").isEqualTo(2);
+assertThat(t3.commitSnapshotId()).as("Snapshot must 
match").isEqualTo(snap4.snapshotId());
+assertThat(t3.file().path()).as("Data file must 
match").isEqualTo(FILE_A2.path());
+assertThat(t3.addedDeletes().size()).as("Number of added delete files must 
match").isEqualTo(2);
+assertThat(t3.addedDeletes().get(0).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_A2_DELETES.path());
+assertThat(t3.addedDeletes().get(1).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_A_DELETES.path());
+assertThat(t3.existingDeletes()).as("Must be no existing 
deletes").isEmpty();
+
+DeletedRowsScanTask t4 = (DeletedRowsScanTask) tasks.get(3);
+assertThat(t4.changeOrdinal()).as("Ordinal must match").isEqualTo(2);
+assertThat(t4.commitSnapshotId()).as("Snapshot must 
match").isEqualTo(snap4.snapshotId());
+assertThat(t4.file().path()).as("Data file must 
match").isEqualTo(FILE_A.path());
+assertThat(t4.addedDeletes().size()).as("Number of added delete files must 
match").isEqualTo(2);
+assertThat(t4.addedDeletes().get(0).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_A2_DELETES.path());
+assertThat(t4.addedDeletes().get(1).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_A_DELETES.path());
+assertThat(t4.existingDeletes()).as("Must be no existing 
deletes").isEmpty();

Review Comment:
   Thanks for thinking about this and the idea. I learned something about 
AssertJ from you!
   As I see it, however, the example you show does not actually result in 
reduction in code, just that it is organized differently. Also, as far as using 
the `RecursiveComparisonConfiguration` goes, that part really does not reduce 
code at all. Conceptually, we want to check what the 
deletes/addedDeletes/existingDeletes are, and it is enough to check that the 
file path matches with the path of the `DeleteFile`s we expect. In other words,
   ```
   assertThat(t4.addedDeletes().get(0).path())
   .isEqualTo(FILE_A2_DELETES.path());
   ```
   is just as good as
   ```
   assertThat(t4.addedDeletes().get(0))
   .usingRecursiveComparison(FILE_COMPARISON_CONFIG)
   .isEqualTo(FILE_A2_DELETES);
   ```
   for our purposes, and is slightly shorter.



-- 
This is an automated message from the Apache Git Service.
T

Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-18 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1765955792


##
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##
@@ -132,6 +131,175 @@ public void testFileDeletes() {
 assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty();
   }
 
+  @TestTemplate
+  public void testRowDeletes() {
+assumeThat(formatVersion).isEqualTo(2);
+
+table
+.newFastAppend()
+.appendFile(FILE_A)
+.appendFile(FILE_A2)
+.appendFile(FILE_B)
+.appendFile(FILE_C)
+.commit();
+Snapshot snap1 = table.currentSnapshot();
+
+// position delete
+table.newRowDelta().addDeletes(FILE_B_DELETES).commit();
+Snapshot snap2 = table.currentSnapshot();
+
+// equality delete
+table.newRowDelta().addDeletes(FILE_C2_DELETES).commit();
+Snapshot snap3 = table.currentSnapshot();
+
+// mix of position and equality deletes
+
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();
+Snapshot snap4 = table.currentSnapshot();
+
+IncrementalChangelogScan scan =
+
newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap4.snapshotId());
+
+List tasks = plan(scan);
+
+assertThat(tasks).as("Must have 4 tasks").hasSize(4);
+
+DeletedRowsScanTask t1 = (DeletedRowsScanTask) tasks.get(0);
+assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0);
+assertThat(t1.commitSnapshotId()).as("Snapshot must 
match").isEqualTo(snap2.snapshotId());
+assertThat(t1.file().path()).as("Data file must 
match").isEqualTo(FILE_B.path());
+assertThat(t1.addedDeletes().get(0).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_B_DELETES.path());
+assertThat(t1.existingDeletes()).as("Must be no existing 
deletes").isEmpty();
+
+DeletedRowsScanTask t2 = (DeletedRowsScanTask) tasks.get(1);
+assertThat(t2.changeOrdinal()).as("Ordinal must match").isEqualTo(1);
+assertThat(t2.commitSnapshotId()).as("Snapshot must 
match").isEqualTo(snap3.snapshotId());
+assertThat(t2.file().path()).as("Data file must 
match").isEqualTo(FILE_C.path());
+assertThat(t2.addedDeletes().get(0).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_C2_DELETES.path());
+assertThat(t2.existingDeletes()).as("Must be no existing 
deletes").isEmpty();
+
+DeletedRowsScanTask t3 = (DeletedRowsScanTask) tasks.get(2);
+assertThat(t3.changeOrdinal()).as("Ordinal must match").isEqualTo(2);
+assertThat(t3.commitSnapshotId()).as("Snapshot must 
match").isEqualTo(snap4.snapshotId());
+assertThat(t3.file().path()).as("Data file must 
match").isEqualTo(FILE_A2.path());
+assertThat(t3.addedDeletes().size()).as("Number of added delete files must 
match").isEqualTo(2);
+assertThat(t3.addedDeletes().get(0).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_A2_DELETES.path());
+assertThat(t3.addedDeletes().get(1).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_A_DELETES.path());
+assertThat(t3.existingDeletes()).as("Must be no existing 
deletes").isEmpty();
+
+DeletedRowsScanTask t4 = (DeletedRowsScanTask) tasks.get(3);
+assertThat(t4.changeOrdinal()).as("Ordinal must match").isEqualTo(2);
+assertThat(t4.commitSnapshotId()).as("Snapshot must 
match").isEqualTo(snap4.snapshotId());
+assertThat(t4.file().path()).as("Data file must 
match").isEqualTo(FILE_A.path());
+assertThat(t4.addedDeletes().size()).as("Number of added delete files must 
match").isEqualTo(2);
+assertThat(t4.addedDeletes().get(0).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_A2_DELETES.path());
+assertThat(t4.addedDeletes().get(1).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_A_DELETES.path());
+assertThat(t4.existingDeletes()).as("Must be no existing 
deletes").isEmpty();

Review Comment:
   Thanks for thinking about this and the idea. I learned something about 
AssertJ from you!
   As I see it, however, the example you show does not actually result in 
reduction in code, just that it is organized differently. Also, as far as using 
the `RecursiveComparisonConfiguration` goes, that part really does not reduce 
code at all. Conceptually, we want to check what the 
deletes/addedDeletes/existingDeletes are, and it is enough to check that the 
file path matches with the path of the `DeleteFile`s we expect. In other words,
   ```
   assertThat(t4.addedDeletes().get(0).path())
   .isEqualTo(FILE_A2_DELETES.path());
   ```
   is just as good as
   ```
   assertThat(t4.addedDeletes().get(0))
   .usingRecursiveComparison(FILE_COMPARISON_CONFIG)
   .isEqualTo(FILE_A2_DELETES);
   ```
   and is slightly shorter.



-- 
This is an automated message from the Apache Git Service.
To respond to the m

Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-18 Thread via GitHub


dramaticlly commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1765491165


##
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##
@@ -132,6 +131,175 @@ public void testFileDeletes() {
 assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty();
   }
 
+  @TestTemplate
+  public void testRowDeletes() {
+assumeThat(formatVersion).isEqualTo(2);
+
+table
+.newFastAppend()
+.appendFile(FILE_A)
+.appendFile(FILE_A2)
+.appendFile(FILE_B)
+.appendFile(FILE_C)
+.commit();
+Snapshot snap1 = table.currentSnapshot();
+
+// position delete
+table.newRowDelta().addDeletes(FILE_B_DELETES).commit();
+Snapshot snap2 = table.currentSnapshot();
+
+// equality delete
+table.newRowDelta().addDeletes(FILE_C2_DELETES).commit();
+Snapshot snap3 = table.currentSnapshot();
+
+// mix of position and equality deletes
+
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();
+Snapshot snap4 = table.currentSnapshot();
+
+IncrementalChangelogScan scan =
+
newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap4.snapshotId());
+
+List tasks = plan(scan);
+
+assertThat(tasks).as("Must have 4 tasks").hasSize(4);
+
+DeletedRowsScanTask t1 = (DeletedRowsScanTask) tasks.get(0);
+assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0);
+assertThat(t1.commitSnapshotId()).as("Snapshot must 
match").isEqualTo(snap2.snapshotId());
+assertThat(t1.file().path()).as("Data file must 
match").isEqualTo(FILE_B.path());
+assertThat(t1.addedDeletes().get(0).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_B_DELETES.path());
+assertThat(t1.existingDeletes()).as("Must be no existing 
deletes").isEmpty();
+
+DeletedRowsScanTask t2 = (DeletedRowsScanTask) tasks.get(1);
+assertThat(t2.changeOrdinal()).as("Ordinal must match").isEqualTo(1);
+assertThat(t2.commitSnapshotId()).as("Snapshot must 
match").isEqualTo(snap3.snapshotId());
+assertThat(t2.file().path()).as("Data file must 
match").isEqualTo(FILE_C.path());
+assertThat(t2.addedDeletes().get(0).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_C2_DELETES.path());
+assertThat(t2.existingDeletes()).as("Must be no existing 
deletes").isEmpty();
+
+DeletedRowsScanTask t3 = (DeletedRowsScanTask) tasks.get(2);
+assertThat(t3.changeOrdinal()).as("Ordinal must match").isEqualTo(2);
+assertThat(t3.commitSnapshotId()).as("Snapshot must 
match").isEqualTo(snap4.snapshotId());
+assertThat(t3.file().path()).as("Data file must 
match").isEqualTo(FILE_A2.path());
+assertThat(t3.addedDeletes().size()).as("Number of added delete files must 
match").isEqualTo(2);
+assertThat(t3.addedDeletes().get(0).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_A2_DELETES.path());
+assertThat(t3.addedDeletes().get(1).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_A_DELETES.path());
+assertThat(t3.existingDeletes()).as("Must be no existing 
deletes").isEmpty();
+
+DeletedRowsScanTask t4 = (DeletedRowsScanTask) tasks.get(3);
+assertThat(t4.changeOrdinal()).as("Ordinal must match").isEqualTo(2);
+assertThat(t4.commitSnapshotId()).as("Snapshot must 
match").isEqualTo(snap4.snapshotId());
+assertThat(t4.file().path()).as("Data file must 
match").isEqualTo(FILE_A.path());
+assertThat(t4.addedDeletes().size()).as("Number of added delete files must 
match").isEqualTo(2);
+assertThat(t4.addedDeletes().get(0).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_A2_DELETES.path());
+assertThat(t4.addedDeletes().get(1).path())
+.as("Added delete file must match")
+.isEqualTo(FILE_A_DELETES.path());
+assertThat(t4.existingDeletes()).as("Must be no existing 
deletes").isEmpty();

Review Comment:
   I believe this is the core of the test I am looking for! I think there's 
some feature we can leverage in assertJ so I took the stub to organize this in 
a little different way.
   
   I do notice that the we use the file path instead of object itself for 
asserting equality on data and delete files,  probably because once added to 
the table, its data and file sequence number are inherited from manifest entry. 
I think we can probably reuse this `FILE_COMPARISON_CONFIG` from another class 
[`TestManifestReader`](https://github.com/apache/iceberg/blob/5ea78e3fbe5d8c9c846a1b86bcd2b77d13a31acd/core/src/test/java/org/apache/iceberg/TestManifestReader.java#L40)
   
   ```java
 private static final RecursiveComparisonConfiguration 
FILE_COMPARISON_CONFIG =
 RecursiveComparisonConfiguration.builder()
 .withIgnoredFields(
 "dataSequenceNumber", "fileOrdinal", "fileSequenceNumber", 
"fromProj

Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-17 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1764249470


##
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##
@@ -133,51 +128,124 @@ private static Map 
computeSnapshotOrdinals(Deque snapsh
 return snapshotOrdinals;
   }
 
-  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction {
-private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
+  private static class DummyChangelogScanTask implements ChangelogScanTask {
+public static final DummyChangelogScanTask INSTANCE = new 
DummyChangelogScanTask();
 
-private final Map snapshotOrdinals;
+private DummyChangelogScanTask() {}
 
-CreateDataFileChangeTasks(Deque snapshots) {
-  this.snapshotOrdinals = computeSnapshotOrdinals(snapshots);
+@Override
+public ChangelogOperation operation() {
+  return ChangelogOperation.DELETE;
+}
+
+@Override
+public int changeOrdinal() {
+  return 0;
+}
+
+@Override
+public long commitSnapshotId() {
+  return 0L;
+}
+  }
+
+  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction {
+private final long snapshotId;
+private final long sequenceNumber;
+private final int changeOrdinal;
+
+CreateDataFileChangeTasks(long snapshotId, long sequenceNumber, int 
changeOrdinal) {
+  this.snapshotId = snapshotId;
+  this.sequenceNumber = sequenceNumber;
+  this.changeOrdinal = changeOrdinal;
 }
 
 @Override
 public CloseableIterable apply(
 CloseableIterable> entries, TaskContext 
context) {
 
-  return CloseableIterable.transform(
-  entries,
-  entry -> {
-long commitSnapshotId = entry.snapshotId();
-int changeOrdinal = snapshotOrdinals.get(commitSnapshotId);
-DataFile dataFile = entry.file().copy(context.shouldKeepStats());
-
-switch (entry.status()) {
-  case ADDED:
-return new BaseAddedRowsScanTask(
-changeOrdinal,
-commitSnapshotId,
-dataFile,
-NO_DELETES,
-context.schemaAsString(),
-context.specAsString(),
-context.residuals());
-
-  case DELETED:
-return new BaseDeletedDataFileScanTask(
-changeOrdinal,
-commitSnapshotId,
-dataFile,
-NO_DELETES,
-context.schemaAsString(),
-context.specAsString(),
-context.residuals());
-
-  default:
-throw new IllegalArgumentException("Unexpected entry status: " 
+ entry.status());
-}
-  });
+  CloseableIterable tasks =
+  CloseableIterable.transform(
+  entries,
+  entry -> {
+long entrySnapshotId = entry.snapshotId();
+DataFile dataFile = 
entry.file().copy(context.shouldKeepStats());
+DeleteFile[] deleteFiles = context.deletes().forEntry(entry);
+List added = Lists.newArrayList();
+List existing = Lists.newArrayList();
+for (DeleteFile deleteFile : deleteFiles) {
+  if (sequenceNumber == deleteFile.dataSequenceNumber()) {
+added.add(deleteFile);
+  } else {
+existing.add(deleteFile);
+  }
+}
+DeleteFile[] addedDeleteFiles = added.toArray(new 
DeleteFile[0]);
+DeleteFile[] existingDeleteFiles = existing.toArray(new 
DeleteFile[0]);

Review Comment:
   @pvary this was the bug I mentioned that I needed to fix before pushing a 
change with additional tests in `TestBaseIncrementalChangelogScan`.
   In `TestBaseIncrementalChangelogScan`, in 
`testDeletingDataFileWithExistingDeletes` and 
`testDeletingRowsInDataFileWithExistingDeletes`, if the 
`IncrementalChangelogScan` was from `snap1` to `snap3`, the bug was obscured; 
but with a scan from `snap2` to `snap3`, those tests revealed the bug.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-17 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1763998376


##
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##
@@ -133,51 +128,124 @@ private static Map 
computeSnapshotOrdinals(Deque snapsh
 return snapshotOrdinals;
   }
 
-  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction {
-private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
+  private static class DummyChangelogScanTask implements ChangelogScanTask {
+public static final DummyChangelogScanTask INSTANCE = new 
DummyChangelogScanTask();
 
-private final Map snapshotOrdinals;
+private DummyChangelogScanTask() {}
 
-CreateDataFileChangeTasks(Deque snapshots) {
-  this.snapshotOrdinals = computeSnapshotOrdinals(snapshots);
+@Override
+public ChangelogOperation operation() {
+  return ChangelogOperation.DELETE;
+}
+
+@Override
+public int changeOrdinal() {
+  return 0;
+}
+
+@Override
+public long commitSnapshotId() {
+  return 0L;
+}
+  }
+
+  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction {
+private final long snapshotId;
+private final long sequenceNumber;
+private final int changeOrdinal;
+
+CreateDataFileChangeTasks(long snapshotId, long sequenceNumber, int 
changeOrdinal) {
+  this.snapshotId = snapshotId;
+  this.sequenceNumber = sequenceNumber;
+  this.changeOrdinal = changeOrdinal;
 }
 
 @Override
 public CloseableIterable apply(
 CloseableIterable> entries, TaskContext 
context) {
 
-  return CloseableIterable.transform(
-  entries,
-  entry -> {
-long commitSnapshotId = entry.snapshotId();
-int changeOrdinal = snapshotOrdinals.get(commitSnapshotId);
-DataFile dataFile = entry.file().copy(context.shouldKeepStats());
-
-switch (entry.status()) {
-  case ADDED:
-return new BaseAddedRowsScanTask(
-changeOrdinal,
-commitSnapshotId,
-dataFile,
-NO_DELETES,
-context.schemaAsString(),
-context.specAsString(),
-context.residuals());
-
-  case DELETED:
-return new BaseDeletedDataFileScanTask(
-changeOrdinal,
-commitSnapshotId,
-dataFile,
-NO_DELETES,
-context.schemaAsString(),
-context.specAsString(),
-context.residuals());
-
-  default:
-throw new IllegalArgumentException("Unexpected entry status: " 
+ entry.status());
-}
-  });
+  CloseableIterable tasks =
+  CloseableIterable.transform(
+  entries,
+  entry -> {
+long entrySnapshotId = entry.snapshotId();
+DataFile dataFile = 
entry.file().copy(context.shouldKeepStats());
+DeleteFile[] deleteFiles = context.deletes().forEntry(entry);
+List added = Lists.newArrayList();
+List existing = Lists.newArrayList();
+for (DeleteFile deleteFile : deleteFiles) {
+  if (sequenceNumber == deleteFile.dataSequenceNumber()) {
+added.add(deleteFile);
+  } else {
+existing.add(deleteFile);
+  }
+}
+DeleteFile[] addedDeleteFiles = added.toArray(new 
DeleteFile[0]);
+DeleteFile[] existingDeleteFiles = existing.toArray(new 
DeleteFile[0]);

Review Comment:
   Previously, I used a map from delete files to the snapshot where it is 
added, to determine the `addedDeleteFiles` and `existingDeleteFiles`. However, 
this map is computed for the snapshots in the range of interest (for the 
`BaseIncrementalChangelogScan`), and so there may not be an entry in the map 
for a delete file if it is added in a snapshot before this range. This causes 
issues in the `FluentIterable::filter(Predicate)` I was using to filter the 
delete files. This approach is simpler, and turns out to be what @manuzhang 
used his https://github.com/apache/iceberg/pull/9888. Hat tip to Manu.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-17 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1763979054


##
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##
@@ -133,51 +131,149 @@ private static Map 
computeSnapshotOrdinals(Deque snapsh
 return snapshotOrdinals;
   }
 
-  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction {
-private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
+  private Map computeDeleteFileToSnapshotOrdinal(
+  Deque snapshots, Map snapshotOrdinals) {
+Map deleteFileToSnapshotOrdinal = Maps.newHashMap();
+
+for (Snapshot snapshot : snapshots) {
+  Iterable deleteFiles = 
snapshot.addedDeleteFiles(table().io());
+  for (DeleteFile deleteFile : deleteFiles) {
+deleteFileToSnapshotOrdinal.put(
+deleteFile.path().toString(), 
snapshotOrdinals.get(snapshot.snapshotId()));
+  }
+}
+
+return deleteFileToSnapshotOrdinal;
+  }
+
+  private static class DummyChangelogScanTask implements ChangelogScanTask {
+public static final DummyChangelogScanTask INSTANCE = new 
DummyChangelogScanTask();
 
+private DummyChangelogScanTask() {}
+
+@Override
+public ChangelogOperation operation() {
+  return ChangelogOperation.DELETE;
+}
+
+@Override
+public int changeOrdinal() {
+  return 0;
+}
+
+@Override
+public long commitSnapshotId() {
+  return 0L;
+}
+  }
+
+  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction {
+private final long snapshotId;
+private final int changeOrdinal;
 private final Map snapshotOrdinals;
+private final Map deleteFileToSnapshotOrdinal;
+
+CreateDataFileChangeTasks(
+long snapshotId,
+Map snapshotOrdinals,
+Map deleteFileToSnapshotOrdinal) {
+  this.snapshotId = snapshotId;
+  this.snapshotOrdinals = snapshotOrdinals;
+  this.deleteFileToSnapshotOrdinal = deleteFileToSnapshotOrdinal;
+  this.changeOrdinal = this.snapshotOrdinals.get(snapshotId);
+}
+
+private DeleteFile[] filterAdded(DeleteFile[] deleteFiles) {
+  return FluentIterable.from(deleteFiles)
+  .filter(
+  deleteFile ->
+  
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) == changeOrdinal)
+  .toArray(DeleteFile.class);
+}
 
-CreateDataFileChangeTasks(Deque snapshots) {
-  this.snapshotOrdinals = computeSnapshotOrdinals(snapshots);
+private DeleteFile[] filterExisting(DeleteFile[] deleteFiles) {
+  return FluentIterable.from(deleteFiles)
+  .filter(
+  deleteFile ->
+  
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) < changeOrdinal)

Review Comment:
   These methods have been removed in the latest version, so this is no longer 
relevant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-17 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1763976533


##
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##
@@ -132,6 +131,139 @@ public void testFileDeletes() {
 assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty();
   }
 
+  @TestTemplate
+  public void testPositionDeletes() {

Review Comment:
   Please look at the new consolidated test, `testRowDeletes`. I think that at 
the level at which we can test `BaseIncrementalChangelogScan`, it doesn't 
matter whether delete files are position delete files or equality delete files 
(they only matter when we actually apply the deletes, which doesn't happen at 
this level), so `testPositionDeletes` and `testEqualityDeletes` didn't really 
test different cases.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-17 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1763976533


##
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##
@@ -132,6 +131,139 @@ public void testFileDeletes() {
 assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty();
   }
 
+  @TestTemplate
+  public void testPositionDeletes() {

Review Comment:
   Please look at the new consolidated test, `testRowDeletes`. I think that at 
the level at which we can test `BaseIncrementalChangelogScan`, it doesn't 
matter whether delete files are position delete files or equality delete files 
(they only matter when we actually apply the deletes, which doesn't happen at 
this level), so `testPositionDeletes` and `testEqualityDeletes` don't really 
test different cases.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-17 Thread via GitHub


pvary commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1762795906


##
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##
@@ -132,6 +131,139 @@ public void testFileDeletes() {
 assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty();
   }
 
+  @TestTemplate
+  public void testPositionDeletes() {

Review Comment:
   @wypoon: We usually try to create different tests for different test cases. 
So your original approach seems nice. Could we remove the code duplication with 
parameterized tests, or externalizing the duplicated code to test methods?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-16 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1762263665


##
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##
@@ -132,6 +131,139 @@ public void testFileDeletes() {
 assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty();
   }
 
+  @TestTemplate
+  public void testPositionDeletes() {

Review Comment:
   Thank you for the review.
   I consolidated `testPositionDeletes`, `testEqualityDeletes` and 
`testMixOfPositionAndEqualityDeletes` into one test to reduce repetition as 
well as to test a scan with multiple snapshots. I also added tests for scan 
tasks with existing deletes.
   In the process, I discovered a bug in some edge cases and need to work on a 
fix before I push an update.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-14 Thread via GitHub


pvary commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1759686766


##
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##
@@ -132,6 +131,139 @@ public void testFileDeletes() {
 assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty();
   }
 
+  @TestTemplate
+  public void testPositionDeletes() {

Review Comment:
   nice stuff!
   Could we have 2 more test cases?
   1. It would be nice to test the existing deletes scenario
   2. It would be nice to test when the scan returns result for multiple 
snapshots
   
   Thanks,
   Peter



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-14 Thread via GitHub


pvary commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1759685604


##
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##
@@ -63,33 +60,43 @@ protected CloseableIterable doPlanFiles(
   return CloseableIterable.empty();
 }
 
-Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+Map snapshotOrdinals = 
computeSnapshotOrdinals(changelogSnapshots);
 
-Set newDataManifests =
-FluentIterable.from(changelogSnapshots)
-.transformAndConcat(snapshot -> 
snapshot.dataManifests(table().io()))
-.filter(manifest -> 
changelogSnapshotIds.contains(manifest.snapshotId()))
-.toSet();
-
-ManifestGroup manifestGroup =
-new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
-.specsById(table().specs())
-.caseSensitive(isCaseSensitive())
-.select(scanColumns())
-.filterData(filter())
-.filterManifestEntries(entry -> 
changelogSnapshotIds.contains(entry.snapshotId()))
-.ignoreExisting()
-.columnsToKeepStats(columnsToKeepStats());
-
-if (shouldIgnoreResiduals()) {
-  manifestGroup = manifestGroup.ignoreResiduals();
-}
-
-if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
-  manifestGroup = manifestGroup.planWith(planExecutor());
-}
+// map of delete file to the snapshot where the delete file is added
+// the delete file is keyed by its path, and the snapshot is represented 
by the snapshot ordinal
+Map deleteFileToSnapshotOrdinal =
+computeDeleteFileToSnapshotOrdinal(changelogSnapshots, 
snapshotOrdinals);
 
-return manifestGroup.plan(new 
CreateDataFileChangeTasks(changelogSnapshots));
+Iterable> plans =
+FluentIterable.from(changelogSnapshots)
+.transform(
+snapshot -> {
+  List dataManifests = 
snapshot.dataManifests(table().io());
+  List deleteManifests = 
snapshot.deleteManifests(table().io());
+
+  ManifestGroup manifestGroup =
+  new ManifestGroup(table().io(), dataManifests, 
deleteManifests)
+  .specsById(table().specs())
+  .caseSensitive(isCaseSensitive())
+  .select(scanColumns())
+  .filterData(filter())
+  .columnsToKeepStats(columnsToKeepStats());
+
+  if (shouldIgnoreResiduals()) {
+manifestGroup = manifestGroup.ignoreResiduals();
+  }
+
+  if (dataManifests.size() > 1 && shouldPlanWithExecutor()) {

Review Comment:
   Thanks!
   I missed that we are iterating through the manifest files, and not the 
elements of the files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-13 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1759573665


##
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##
@@ -63,33 +60,43 @@ protected CloseableIterable doPlanFiles(
   return CloseableIterable.empty();
 }
 
-Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+Map snapshotOrdinals = 
computeSnapshotOrdinals(changelogSnapshots);
 
-Set newDataManifests =
-FluentIterable.from(changelogSnapshots)
-.transformAndConcat(snapshot -> 
snapshot.dataManifests(table().io()))
-.filter(manifest -> 
changelogSnapshotIds.contains(manifest.snapshotId()))
-.toSet();
-
-ManifestGroup manifestGroup =
-new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
-.specsById(table().specs())
-.caseSensitive(isCaseSensitive())
-.select(scanColumns())
-.filterData(filter())
-.filterManifestEntries(entry -> 
changelogSnapshotIds.contains(entry.snapshotId()))
-.ignoreExisting()
-.columnsToKeepStats(columnsToKeepStats());
-
-if (shouldIgnoreResiduals()) {
-  manifestGroup = manifestGroup.ignoreResiduals();
-}
-
-if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
-  manifestGroup = manifestGroup.planWith(planExecutor());
-}
+// map of delete file to the snapshot where the delete file is added
+// the delete file is keyed by its path, and the snapshot is represented 
by the snapshot ordinal
+Map deleteFileToSnapshotOrdinal =
+computeDeleteFileToSnapshotOrdinal(changelogSnapshots, 
snapshotOrdinals);
 
-return manifestGroup.plan(new 
CreateDataFileChangeTasks(changelogSnapshots));
+Iterable> plans =
+FluentIterable.from(changelogSnapshots)
+.transform(
+snapshot -> {
+  List dataManifests = 
snapshot.dataManifests(table().io());
+  List deleteManifests = 
snapshot.deleteManifests(table().io());
+
+  ManifestGroup manifestGroup =
+  new ManifestGroup(table().io(), dataManifests, 
deleteManifests)
+  .specsById(table().specs())
+  .caseSensitive(isCaseSensitive())
+  .select(scanColumns())
+  .filterData(filter())
+  .columnsToKeepStats(columnsToKeepStats());
+
+  if (shouldIgnoreResiduals()) {
+manifestGroup = manifestGroup.ignoreResiduals();
+  }
+
+  if (dataManifests.size() > 1 && shouldPlanWithExecutor()) {

Review Comment:
   I see where you're coming from.
   Before this change, there is only one call to `ManifestGroup:plan`. The 
`ManifestGroup` contains manifests from all the snapshots in the range of 
interest, only manifest entries added in one of these snapshots are considered 
and manifests with only existing entries are ignored; then one call to `plan` 
generates the scan tasks. This approach only works for the copy-on-write case 
where there are no delete files; in the copy-on-write case, when deletes occur, 
new data files are always added and so will show up in the `ManifestGroup`. 
When we have delete files, a situation as you describe can easily arise, where 
there is no new data file added in a snapshot, only new delete files. That is 
why we need a different approach.
   With this change, there is a `ManifestGroup` created for each snapshot in 
the range of interest and a call to `ManifestGroup:plan` for it; then the 
`CloseableIterable` of scan tasks for all of these are concatenated into one 
`CloseableIterable`. We no longer ignore data manifests with existing entries; 
in fact, we have to consider them. In a snapshot with no added data files, just 
added delete files, we consider the existing data files and see if any deletes 
apply to them.
   Now, each call to `ManifestGroup:plan` is still only parallelizable 
depending on the number of data manifest files in that `ManifestGroup`. There 
is additional parallelism possible, but I'm not aware that we have the API for 
it, which is since we are making multiple `ManifestGroup:plan` calls (but on 
different `ManifestGroup` instances), to distribute each unit of work from 
every call to one executor service. That is not so easy to do with the API we 
have, I think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands,

Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-12 Thread via GitHub


pvary commented on PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2348057437

   > @pvary I added some tests in `TestBaseIncrementalChangelogScan`. However, 
at that level, we can only check what scan tasks (`AddedRowsScanTask`, 
`DeletedRowsScanTask`, `DeletedDataFileScanTask`) are generated. We cannot 
actually check the kind of results as we can in the Spark 
`TestChangelogReader`. Right now, the only consumer of 
`IncrementalChangelogScan` is Spark, so some of the functionality needs to be 
tested through Spark.
   
   Makes sense. Give me some time to review the added tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-12 Thread via GitHub


pvary commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1758196928


##
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##
@@ -63,33 +60,43 @@ protected CloseableIterable doPlanFiles(
   return CloseableIterable.empty();
 }
 
-Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+Map snapshotOrdinals = 
computeSnapshotOrdinals(changelogSnapshots);
 
-Set newDataManifests =
-FluentIterable.from(changelogSnapshots)
-.transformAndConcat(snapshot -> 
snapshot.dataManifests(table().io()))
-.filter(manifest -> 
changelogSnapshotIds.contains(manifest.snapshotId()))
-.toSet();
-
-ManifestGroup manifestGroup =
-new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
-.specsById(table().specs())
-.caseSensitive(isCaseSensitive())
-.select(scanColumns())
-.filterData(filter())
-.filterManifestEntries(entry -> 
changelogSnapshotIds.contains(entry.snapshotId()))
-.ignoreExisting()
-.columnsToKeepStats(columnsToKeepStats());
-
-if (shouldIgnoreResiduals()) {
-  manifestGroup = manifestGroup.ignoreResiduals();
-}
-
-if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
-  manifestGroup = manifestGroup.planWith(planExecutor());
-}
+// map of delete file to the snapshot where the delete file is added
+// the delete file is keyed by its path, and the snapshot is represented 
by the snapshot ordinal
+Map deleteFileToSnapshotOrdinal =
+computeDeleteFileToSnapshotOrdinal(changelogSnapshots, 
snapshotOrdinals);
 
-return manifestGroup.plan(new 
CreateDataFileChangeTasks(changelogSnapshots));
+Iterable> plans =
+FluentIterable.from(changelogSnapshots)
+.transform(
+snapshot -> {
+  List dataManifests = 
snapshot.dataManifests(table().io());
+  List deleteManifests = 
snapshot.deleteManifests(table().io());
+
+  ManifestGroup manifestGroup =
+  new ManifestGroup(table().io(), dataManifests, 
deleteManifests)
+  .specsById(table().specs())
+  .caseSensitive(isCaseSensitive())
+  .select(scanColumns())
+  .filterData(filter())
+  .columnsToKeepStats(columnsToKeepStats());
+
+  if (shouldIgnoreResiduals()) {
+manifestGroup = manifestGroup.ignoreResiduals();
+  }
+
+  if (dataManifests.size() > 1 && shouldPlanWithExecutor()) {

Review Comment:
   I am concerned about a situation when there is no new data files in a 
snapshot.
   Let's say, we only delete a single row with either a positional or equality 
delete. How do we handle this situation? Or when we have multiple of these 
delete files, but no data files at all?
   
   Thanks for your commitment and efforts. Great to see this feature 
progressing!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-12 Thread via GitHub


wypoon commented on PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2348055779

   @pvary I added some tests in `TestBaseIncrementalChangelogScan`. However, at 
that level, we can only check what scan tasks (`AddedRowsScanTask`, 
`DeletedRowsScanTask`, `DeletedDataFileScanTask`) are generated. We cannot 
actually check the kind of results as we can in the Spark 
`TestChangelogReader`. Right now, the only consumer of 
`IncrementalChangelogScan` is Spark, so some of the functionality needs to be 
tested through Spark.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-12 Thread via GitHub


pvary commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1758196928


##
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##
@@ -63,33 +60,43 @@ protected CloseableIterable doPlanFiles(
   return CloseableIterable.empty();
 }
 
-Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+Map snapshotOrdinals = 
computeSnapshotOrdinals(changelogSnapshots);
 
-Set newDataManifests =
-FluentIterable.from(changelogSnapshots)
-.transformAndConcat(snapshot -> 
snapshot.dataManifests(table().io()))
-.filter(manifest -> 
changelogSnapshotIds.contains(manifest.snapshotId()))
-.toSet();
-
-ManifestGroup manifestGroup =
-new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
-.specsById(table().specs())
-.caseSensitive(isCaseSensitive())
-.select(scanColumns())
-.filterData(filter())
-.filterManifestEntries(entry -> 
changelogSnapshotIds.contains(entry.snapshotId()))
-.ignoreExisting()
-.columnsToKeepStats(columnsToKeepStats());
-
-if (shouldIgnoreResiduals()) {
-  manifestGroup = manifestGroup.ignoreResiduals();
-}
-
-if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
-  manifestGroup = manifestGroup.planWith(planExecutor());
-}
+// map of delete file to the snapshot where the delete file is added
+// the delete file is keyed by its path, and the snapshot is represented 
by the snapshot ordinal
+Map deleteFileToSnapshotOrdinal =
+computeDeleteFileToSnapshotOrdinal(changelogSnapshots, 
snapshotOrdinals);
 
-return manifestGroup.plan(new 
CreateDataFileChangeTasks(changelogSnapshots));
+Iterable> plans =
+FluentIterable.from(changelogSnapshots)
+.transform(
+snapshot -> {
+  List dataManifests = 
snapshot.dataManifests(table().io());
+  List deleteManifests = 
snapshot.deleteManifests(table().io());
+
+  ManifestGroup manifestGroup =
+  new ManifestGroup(table().io(), dataManifests, 
deleteManifests)
+  .specsById(table().specs())
+  .caseSensitive(isCaseSensitive())
+  .select(scanColumns())
+  .filterData(filter())
+  .columnsToKeepStats(columnsToKeepStats());
+
+  if (shouldIgnoreResiduals()) {
+manifestGroup = manifestGroup.ignoreResiduals();
+  }
+
+  if (dataManifests.size() > 1 && shouldPlanWithExecutor()) {

Review Comment:
   I am concerned about a situation when there is no new data files in a 
snapshot.
   Let's say, we only delete a single row with either a positional or equality 
delete. How do we handle this situation? Especially when we have multiple of 
these delete files, but no data files at all?
   
   Thanks for your commitment and efforts. Great to see this feature 
progressing!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-12 Thread via GitHub


pvary commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1758193054


##
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##
@@ -133,51 +131,149 @@ private static Map 
computeSnapshotOrdinals(Deque snapsh
 return snapshotOrdinals;
   }
 
-  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction {
-private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
+  private Map computeDeleteFileToSnapshotOrdinal(
+  Deque snapshots, Map snapshotOrdinals) {
+Map deleteFileToSnapshotOrdinal = Maps.newHashMap();
+
+for (Snapshot snapshot : snapshots) {
+  Iterable deleteFiles = 
snapshot.addedDeleteFiles(table().io());
+  for (DeleteFile deleteFile : deleteFiles) {
+deleteFileToSnapshotOrdinal.put(
+deleteFile.path().toString(), 
snapshotOrdinals.get(snapshot.snapshotId()));
+  }
+}
+
+return deleteFileToSnapshotOrdinal;
+  }
+
+  private static class DummyChangelogScanTask implements ChangelogScanTask {
+public static final DummyChangelogScanTask INSTANCE = new 
DummyChangelogScanTask();
 
+private DummyChangelogScanTask() {}
+
+@Override
+public ChangelogOperation operation() {
+  return ChangelogOperation.DELETE;
+}
+
+@Override
+public int changeOrdinal() {
+  return 0;
+}
+
+@Override
+public long commitSnapshotId() {
+  return 0L;
+}
+  }
+
+  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction {
+private final long snapshotId;
+private final int changeOrdinal;
 private final Map snapshotOrdinals;
+private final Map deleteFileToSnapshotOrdinal;
+
+CreateDataFileChangeTasks(
+long snapshotId,
+Map snapshotOrdinals,
+Map deleteFileToSnapshotOrdinal) {
+  this.snapshotId = snapshotId;
+  this.snapshotOrdinals = snapshotOrdinals;
+  this.deleteFileToSnapshotOrdinal = deleteFileToSnapshotOrdinal;
+  this.changeOrdinal = this.snapshotOrdinals.get(snapshotId);
+}
+
+private DeleteFile[] filterAdded(DeleteFile[] deleteFiles) {
+  return FluentIterable.from(deleteFiles)
+  .filter(
+  deleteFile ->
+  
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) == changeOrdinal)
+  .toArray(DeleteFile.class);
+}
 
-CreateDataFileChangeTasks(Deque snapshots) {
-  this.snapshotOrdinals = computeSnapshotOrdinals(snapshots);
+private DeleteFile[] filterExisting(DeleteFile[] deleteFiles) {
+  return FluentIterable.from(deleteFiles)
+  .filter(
+  deleteFile ->
+  
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) < changeOrdinal)
+  .toArray(DeleteFile.class);
 }
 
 @Override
 public CloseableIterable apply(
 CloseableIterable> entries, TaskContext 
context) {
 
-  return CloseableIterable.transform(
-  entries,
-  entry -> {
-long commitSnapshotId = entry.snapshotId();
-int changeOrdinal = snapshotOrdinals.get(commitSnapshotId);
-DataFile dataFile = entry.file().copy(context.shouldKeepStats());
-
-switch (entry.status()) {
-  case ADDED:
-return new BaseAddedRowsScanTask(
-changeOrdinal,
-commitSnapshotId,
-dataFile,
-NO_DELETES,
-context.schemaAsString(),
-context.specAsString(),
-context.residuals());
-
-  case DELETED:
-return new BaseDeletedDataFileScanTask(
-changeOrdinal,
-commitSnapshotId,
-dataFile,
-NO_DELETES,
-context.schemaAsString(),
-context.specAsString(),
-context.residuals());
-
-  default:
-throw new IllegalArgumentException("Unexpected entry status: " 
+ entry.status());
-}
-  });
+  CloseableIterable tasks =
+  CloseableIterable.transform(
+  entries,
+  entry -> {
+long entrySnapshotId = entry.snapshotId();
+DataFile dataFile = 
entry.file().copy(context.shouldKeepStats());
+DeleteFile[] addedDeleteFiles = 
filterAdded(context.deletes().forEntry(entry));
+
+switch (entry.status()) {
+  case ADDED:
+if (entrySnapshotId == snapshotId) {
+  return new BaseAddedRowsScanTask(
+  changeOrdinal,
+  snapshotId,
+  dataFile,
+  addedDeleteFiles,
+  context.schemaAsString(),
+  context.specAsString(),
+ 

Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-12 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1758191734


##
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##
@@ -63,33 +60,43 @@ protected CloseableIterable doPlanFiles(
   return CloseableIterable.empty();
 }
 
-Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+Map snapshotOrdinals = 
computeSnapshotOrdinals(changelogSnapshots);
 
-Set newDataManifests =
-FluentIterable.from(changelogSnapshots)
-.transformAndConcat(snapshot -> 
snapshot.dataManifests(table().io()))
-.filter(manifest -> 
changelogSnapshotIds.contains(manifest.snapshotId()))
-.toSet();
-
-ManifestGroup manifestGroup =
-new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
-.specsById(table().specs())
-.caseSensitive(isCaseSensitive())
-.select(scanColumns())
-.filterData(filter())
-.filterManifestEntries(entry -> 
changelogSnapshotIds.contains(entry.snapshotId()))
-.ignoreExisting()
-.columnsToKeepStats(columnsToKeepStats());
-
-if (shouldIgnoreResiduals()) {
-  manifestGroup = manifestGroup.ignoreResiduals();
-}
-
-if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
-  manifestGroup = manifestGroup.planWith(planExecutor());
-}
+// map of delete file to the snapshot where the delete file is added
+// the delete file is keyed by its path, and the snapshot is represented 
by the snapshot ordinal
+Map deleteFileToSnapshotOrdinal =
+computeDeleteFileToSnapshotOrdinal(changelogSnapshots, 
snapshotOrdinals);
 
-return manifestGroup.plan(new 
CreateDataFileChangeTasks(changelogSnapshots));
+Iterable> plans =
+FluentIterable.from(changelogSnapshots)
+.transform(
+snapshot -> {
+  List dataManifests = 
snapshot.dataManifests(table().io());
+  List deleteManifests = 
snapshot.deleteManifests(table().io());
+
+  ManifestGroup manifestGroup =
+  new ManifestGroup(table().io(), dataManifests, 
deleteManifests)
+  .specsById(table().specs())
+  .caseSensitive(isCaseSensitive())
+  .select(scanColumns())
+  .filterData(filter())
+  .columnsToKeepStats(columnsToKeepStats());
+
+  if (shouldIgnoreResiduals()) {
+manifestGroup = manifestGroup.ignoreResiduals();
+  }
+
+  if (dataManifests.size() > 1 && shouldPlanWithExecutor()) {

Review Comment:
   As I wrote, a unit of work is processing a data manifest file. If we have 
multiple data manifest files (`dataManifests.size() > 1` is true), they can be 
parallelized. For each unit of work, we produce scan tasks for the manifest 
entries in the manifest. Each manifest entry (a data file) can have some number 
of delete files (whether position or equality deletes) associated with it, but 
that number doesn't affect the parallelizability of the work.
   I'm really not sure I understand your question.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-12 Thread via GitHub


pvary commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1758186873


##
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##
@@ -133,51 +131,149 @@ private static Map 
computeSnapshotOrdinals(Deque snapsh
 return snapshotOrdinals;
   }
 
-  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction {
-private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
+  private Map computeDeleteFileToSnapshotOrdinal(
+  Deque snapshots, Map snapshotOrdinals) {
+Map deleteFileToSnapshotOrdinal = Maps.newHashMap();
+
+for (Snapshot snapshot : snapshots) {
+  Iterable deleteFiles = 
snapshot.addedDeleteFiles(table().io());
+  for (DeleteFile deleteFile : deleteFiles) {
+deleteFileToSnapshotOrdinal.put(
+deleteFile.path().toString(), 
snapshotOrdinals.get(snapshot.snapshotId()));
+  }
+}
+
+return deleteFileToSnapshotOrdinal;
+  }
+
+  private static class DummyChangelogScanTask implements ChangelogScanTask {
+public static final DummyChangelogScanTask INSTANCE = new 
DummyChangelogScanTask();
 
+private DummyChangelogScanTask() {}
+
+@Override
+public ChangelogOperation operation() {
+  return ChangelogOperation.DELETE;
+}
+
+@Override
+public int changeOrdinal() {
+  return 0;
+}
+
+@Override
+public long commitSnapshotId() {
+  return 0L;
+}
+  }
+
+  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction {
+private final long snapshotId;
+private final int changeOrdinal;
 private final Map snapshotOrdinals;
+private final Map deleteFileToSnapshotOrdinal;
+
+CreateDataFileChangeTasks(
+long snapshotId,
+Map snapshotOrdinals,
+Map deleteFileToSnapshotOrdinal) {
+  this.snapshotId = snapshotId;
+  this.snapshotOrdinals = snapshotOrdinals;
+  this.deleteFileToSnapshotOrdinal = deleteFileToSnapshotOrdinal;
+  this.changeOrdinal = this.snapshotOrdinals.get(snapshotId);
+}
+
+private DeleteFile[] filterAdded(DeleteFile[] deleteFiles) {
+  return FluentIterable.from(deleteFiles)
+  .filter(
+  deleteFile ->
+  
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) == changeOrdinal)
+  .toArray(DeleteFile.class);
+}
 
-CreateDataFileChangeTasks(Deque snapshots) {
-  this.snapshotOrdinals = computeSnapshotOrdinals(snapshots);
+private DeleteFile[] filterExisting(DeleteFile[] deleteFiles) {
+  return FluentIterable.from(deleteFiles)
+  .filter(
+  deleteFile ->
+  
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) < changeOrdinal)
+  .toArray(DeleteFile.class);
 }
 
 @Override
 public CloseableIterable apply(
 CloseableIterable> entries, TaskContext 
context) {
 
-  return CloseableIterable.transform(
-  entries,
-  entry -> {
-long commitSnapshotId = entry.snapshotId();
-int changeOrdinal = snapshotOrdinals.get(commitSnapshotId);
-DataFile dataFile = entry.file().copy(context.shouldKeepStats());
-
-switch (entry.status()) {
-  case ADDED:
-return new BaseAddedRowsScanTask(
-changeOrdinal,
-commitSnapshotId,
-dataFile,
-NO_DELETES,
-context.schemaAsString(),
-context.specAsString(),
-context.residuals());
-
-  case DELETED:
-return new BaseDeletedDataFileScanTask(
-changeOrdinal,
-commitSnapshotId,
-dataFile,
-NO_DELETES,
-context.schemaAsString(),
-context.specAsString(),
-context.residuals());
-
-  default:
-throw new IllegalArgumentException("Unexpected entry status: " 
+ entry.status());
-}
-  });
+  CloseableIterable tasks =
+  CloseableIterable.transform(
+  entries,
+  entry -> {
+long entrySnapshotId = entry.snapshotId();
+DataFile dataFile = 
entry.file().copy(context.shouldKeepStats());
+DeleteFile[] addedDeleteFiles = 
filterAdded(context.deletes().forEntry(entry));
+
+switch (entry.status()) {
+  case ADDED:
+if (entrySnapshotId == snapshotId) {
+  return new BaseAddedRowsScanTask(
+  changeOrdinal,
+  snapshotId,
+  dataFile,
+  addedDeleteFiles,
+  context.schemaAsString(),
+  context.specAsString(),
+ 

Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-12 Thread via GitHub


pvary commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1758181104


##
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##
@@ -63,33 +60,43 @@ protected CloseableIterable doPlanFiles(
   return CloseableIterable.empty();
 }
 
-Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+Map snapshotOrdinals = 
computeSnapshotOrdinals(changelogSnapshots);
 
-Set newDataManifests =
-FluentIterable.from(changelogSnapshots)
-.transformAndConcat(snapshot -> 
snapshot.dataManifests(table().io()))
-.filter(manifest -> 
changelogSnapshotIds.contains(manifest.snapshotId()))
-.toSet();
-
-ManifestGroup manifestGroup =
-new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
-.specsById(table().specs())
-.caseSensitive(isCaseSensitive())
-.select(scanColumns())
-.filterData(filter())
-.filterManifestEntries(entry -> 
changelogSnapshotIds.contains(entry.snapshotId()))
-.ignoreExisting()
-.columnsToKeepStats(columnsToKeepStats());
-
-if (shouldIgnoreResiduals()) {
-  manifestGroup = manifestGroup.ignoreResiduals();
-}
-
-if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
-  manifestGroup = manifestGroup.planWith(planExecutor());
-}
+// map of delete file to the snapshot where the delete file is added
+// the delete file is keyed by its path, and the snapshot is represented 
by the snapshot ordinal
+Map deleteFileToSnapshotOrdinal =
+computeDeleteFileToSnapshotOrdinal(changelogSnapshots, 
snapshotOrdinals);
 
-return manifestGroup.plan(new 
CreateDataFileChangeTasks(changelogSnapshots));
+Iterable> plans =
+FluentIterable.from(changelogSnapshots)
+.transform(
+snapshot -> {
+  List dataManifests = 
snapshot.dataManifests(table().io());
+  List deleteManifests = 
snapshot.deleteManifests(table().io());
+
+  ManifestGroup manifestGroup =
+  new ManifestGroup(table().io(), dataManifests, 
deleteManifests)
+  .specsById(table().specs())
+  .caseSensitive(isCaseSensitive())
+  .select(scanColumns())
+  .filterData(filter())
+  .columnsToKeepStats(columnsToKeepStats());
+
+  if (shouldIgnoreResiduals()) {
+manifestGroup = manifestGroup.ignoreResiduals();
+  }
+
+  if (dataManifests.size() > 1 && shouldPlanWithExecutor()) {

Review Comment:
   How does the planning handle when the snapshot only has a few equality 
delete files?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-12 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1757711124


##
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##
@@ -133,51 +131,149 @@ private static Map 
computeSnapshotOrdinals(Deque snapsh
 return snapshotOrdinals;
   }
 
-  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction {
-private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
+  private Map computeDeleteFileToSnapshotOrdinal(
+  Deque snapshots, Map snapshotOrdinals) {
+Map deleteFileToSnapshotOrdinal = Maps.newHashMap();
+
+for (Snapshot snapshot : snapshots) {
+  Iterable deleteFiles = 
snapshot.addedDeleteFiles(table().io());
+  for (DeleteFile deleteFile : deleteFiles) {
+deleteFileToSnapshotOrdinal.put(
+deleteFile.path().toString(), 
snapshotOrdinals.get(snapshot.snapshotId()));
+  }
+}
+
+return deleteFileToSnapshotOrdinal;
+  }
+
+  private static class DummyChangelogScanTask implements ChangelogScanTask {
+public static final DummyChangelogScanTask INSTANCE = new 
DummyChangelogScanTask();
 
+private DummyChangelogScanTask() {}
+
+@Override
+public ChangelogOperation operation() {
+  return ChangelogOperation.DELETE;
+}
+
+@Override
+public int changeOrdinal() {
+  return 0;
+}
+
+@Override
+public long commitSnapshotId() {
+  return 0L;
+}
+  }
+
+  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction {
+private final long snapshotId;
+private final int changeOrdinal;
 private final Map snapshotOrdinals;
+private final Map deleteFileToSnapshotOrdinal;
+
+CreateDataFileChangeTasks(
+long snapshotId,
+Map snapshotOrdinals,
+Map deleteFileToSnapshotOrdinal) {
+  this.snapshotId = snapshotId;
+  this.snapshotOrdinals = snapshotOrdinals;
+  this.deleteFileToSnapshotOrdinal = deleteFileToSnapshotOrdinal;
+  this.changeOrdinal = this.snapshotOrdinals.get(snapshotId);
+}
+
+private DeleteFile[] filterAdded(DeleteFile[] deleteFiles) {
+  return FluentIterable.from(deleteFiles)
+  .filter(
+  deleteFile ->
+  
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) == changeOrdinal)
+  .toArray(DeleteFile.class);
+}
 
-CreateDataFileChangeTasks(Deque snapshots) {
-  this.snapshotOrdinals = computeSnapshotOrdinals(snapshots);
+private DeleteFile[] filterExisting(DeleteFile[] deleteFiles) {
+  return FluentIterable.from(deleteFiles)
+  .filter(
+  deleteFile ->
+  
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) < changeOrdinal)
+  .toArray(DeleteFile.class);
 }
 
 @Override
 public CloseableIterable apply(
 CloseableIterable> entries, TaskContext 
context) {
 
-  return CloseableIterable.transform(
-  entries,
-  entry -> {
-long commitSnapshotId = entry.snapshotId();
-int changeOrdinal = snapshotOrdinals.get(commitSnapshotId);
-DataFile dataFile = entry.file().copy(context.shouldKeepStats());
-
-switch (entry.status()) {
-  case ADDED:
-return new BaseAddedRowsScanTask(
-changeOrdinal,
-commitSnapshotId,
-dataFile,
-NO_DELETES,
-context.schemaAsString(),
-context.specAsString(),
-context.residuals());
-
-  case DELETED:
-return new BaseDeletedDataFileScanTask(
-changeOrdinal,
-commitSnapshotId,
-dataFile,
-NO_DELETES,
-context.schemaAsString(),
-context.specAsString(),
-context.residuals());
-
-  default:
-throw new IllegalArgumentException("Unexpected entry status: " 
+ entry.status());
-}
-  });
+  CloseableIterable tasks =
+  CloseableIterable.transform(
+  entries,
+  entry -> {
+long entrySnapshotId = entry.snapshotId();
+DataFile dataFile = 
entry.file().copy(context.shouldKeepStats());
+DeleteFile[] addedDeleteFiles = 
filterAdded(context.deletes().forEntry(entry));
+
+switch (entry.status()) {
+  case ADDED:
+if (entrySnapshotId == snapshotId) {
+  return new BaseAddedRowsScanTask(
+  changeOrdinal,
+  snapshotId,
+  dataFile,
+  addedDeleteFiles,
+  context.schemaAsString(),
+  context.specAsString(),
+

Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-12 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1757711124


##
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##
@@ -133,51 +131,149 @@ private static Map 
computeSnapshotOrdinals(Deque snapsh
 return snapshotOrdinals;
   }
 
-  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction {
-private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
+  private Map computeDeleteFileToSnapshotOrdinal(
+  Deque snapshots, Map snapshotOrdinals) {
+Map deleteFileToSnapshotOrdinal = Maps.newHashMap();
+
+for (Snapshot snapshot : snapshots) {
+  Iterable deleteFiles = 
snapshot.addedDeleteFiles(table().io());
+  for (DeleteFile deleteFile : deleteFiles) {
+deleteFileToSnapshotOrdinal.put(
+deleteFile.path().toString(), 
snapshotOrdinals.get(snapshot.snapshotId()));
+  }
+}
+
+return deleteFileToSnapshotOrdinal;
+  }
+
+  private static class DummyChangelogScanTask implements ChangelogScanTask {
+public static final DummyChangelogScanTask INSTANCE = new 
DummyChangelogScanTask();
 
+private DummyChangelogScanTask() {}
+
+@Override
+public ChangelogOperation operation() {
+  return ChangelogOperation.DELETE;
+}
+
+@Override
+public int changeOrdinal() {
+  return 0;
+}
+
+@Override
+public long commitSnapshotId() {
+  return 0L;
+}
+  }
+
+  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction {
+private final long snapshotId;
+private final int changeOrdinal;
 private final Map snapshotOrdinals;
+private final Map deleteFileToSnapshotOrdinal;
+
+CreateDataFileChangeTasks(
+long snapshotId,
+Map snapshotOrdinals,
+Map deleteFileToSnapshotOrdinal) {
+  this.snapshotId = snapshotId;
+  this.snapshotOrdinals = snapshotOrdinals;
+  this.deleteFileToSnapshotOrdinal = deleteFileToSnapshotOrdinal;
+  this.changeOrdinal = this.snapshotOrdinals.get(snapshotId);
+}
+
+private DeleteFile[] filterAdded(DeleteFile[] deleteFiles) {
+  return FluentIterable.from(deleteFiles)
+  .filter(
+  deleteFile ->
+  
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) == changeOrdinal)
+  .toArray(DeleteFile.class);
+}
 
-CreateDataFileChangeTasks(Deque snapshots) {
-  this.snapshotOrdinals = computeSnapshotOrdinals(snapshots);
+private DeleteFile[] filterExisting(DeleteFile[] deleteFiles) {
+  return FluentIterable.from(deleteFiles)
+  .filter(
+  deleteFile ->
+  
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) < changeOrdinal)
+  .toArray(DeleteFile.class);
 }
 
 @Override
 public CloseableIterable apply(
 CloseableIterable> entries, TaskContext 
context) {
 
-  return CloseableIterable.transform(
-  entries,
-  entry -> {
-long commitSnapshotId = entry.snapshotId();
-int changeOrdinal = snapshotOrdinals.get(commitSnapshotId);
-DataFile dataFile = entry.file().copy(context.shouldKeepStats());
-
-switch (entry.status()) {
-  case ADDED:
-return new BaseAddedRowsScanTask(
-changeOrdinal,
-commitSnapshotId,
-dataFile,
-NO_DELETES,
-context.schemaAsString(),
-context.specAsString(),
-context.residuals());
-
-  case DELETED:
-return new BaseDeletedDataFileScanTask(
-changeOrdinal,
-commitSnapshotId,
-dataFile,
-NO_DELETES,
-context.schemaAsString(),
-context.specAsString(),
-context.residuals());
-
-  default:
-throw new IllegalArgumentException("Unexpected entry status: " 
+ entry.status());
-}
-  });
+  CloseableIterable tasks =
+  CloseableIterable.transform(
+  entries,
+  entry -> {
+long entrySnapshotId = entry.snapshotId();
+DataFile dataFile = 
entry.file().copy(context.shouldKeepStats());
+DeleteFile[] addedDeleteFiles = 
filterAdded(context.deletes().forEntry(entry));
+
+switch (entry.status()) {
+  case ADDED:
+if (entrySnapshotId == snapshotId) {
+  return new BaseAddedRowsScanTask(
+  changeOrdinal,
+  snapshotId,
+  dataFile,
+  addedDeleteFiles,
+  context.schemaAsString(),
+  context.specAsString(),
+

Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-12 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1757707990


##
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##
@@ -133,51 +131,149 @@ private static Map 
computeSnapshotOrdinals(Deque snapsh
 return snapshotOrdinals;
   }
 
-  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction {
-private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
+  private Map computeDeleteFileToSnapshotOrdinal(
+  Deque snapshots, Map snapshotOrdinals) {
+Map deleteFileToSnapshotOrdinal = Maps.newHashMap();
+
+for (Snapshot snapshot : snapshots) {
+  Iterable deleteFiles = 
snapshot.addedDeleteFiles(table().io());
+  for (DeleteFile deleteFile : deleteFiles) {
+deleteFileToSnapshotOrdinal.put(
+deleteFile.path().toString(), 
snapshotOrdinals.get(snapshot.snapshotId()));
+  }
+}
+
+return deleteFileToSnapshotOrdinal;
+  }
+
+  private static class DummyChangelogScanTask implements ChangelogScanTask {
+public static final DummyChangelogScanTask INSTANCE = new 
DummyChangelogScanTask();
 
+private DummyChangelogScanTask() {}
+
+@Override
+public ChangelogOperation operation() {
+  return ChangelogOperation.DELETE;
+}
+
+@Override
+public int changeOrdinal() {
+  return 0;
+}
+
+@Override
+public long commitSnapshotId() {
+  return 0L;
+}
+  }
+
+  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction {
+private final long snapshotId;
+private final int changeOrdinal;
 private final Map snapshotOrdinals;
+private final Map deleteFileToSnapshotOrdinal;
+
+CreateDataFileChangeTasks(
+long snapshotId,
+Map snapshotOrdinals,
+Map deleteFileToSnapshotOrdinal) {
+  this.snapshotId = snapshotId;
+  this.snapshotOrdinals = snapshotOrdinals;
+  this.deleteFileToSnapshotOrdinal = deleteFileToSnapshotOrdinal;
+  this.changeOrdinal = this.snapshotOrdinals.get(snapshotId);
+}
+
+private DeleteFile[] filterAdded(DeleteFile[] deleteFiles) {
+  return FluentIterable.from(deleteFiles)
+  .filter(
+  deleteFile ->
+  
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) == changeOrdinal)
+  .toArray(DeleteFile.class);
+}
 
-CreateDataFileChangeTasks(Deque snapshots) {
-  this.snapshotOrdinals = computeSnapshotOrdinals(snapshots);
+private DeleteFile[] filterExisting(DeleteFile[] deleteFiles) {
+  return FluentIterable.from(deleteFiles)
+  .filter(
+  deleteFile ->
+  
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) < changeOrdinal)
+  .toArray(DeleteFile.class);
 }
 
 @Override
 public CloseableIterable apply(
 CloseableIterable> entries, TaskContext 
context) {
 
-  return CloseableIterable.transform(
-  entries,
-  entry -> {
-long commitSnapshotId = entry.snapshotId();
-int changeOrdinal = snapshotOrdinals.get(commitSnapshotId);
-DataFile dataFile = entry.file().copy(context.shouldKeepStats());
-
-switch (entry.status()) {
-  case ADDED:
-return new BaseAddedRowsScanTask(
-changeOrdinal,
-commitSnapshotId,
-dataFile,
-NO_DELETES,
-context.schemaAsString(),
-context.specAsString(),
-context.residuals());
-
-  case DELETED:
-return new BaseDeletedDataFileScanTask(
-changeOrdinal,
-commitSnapshotId,
-dataFile,
-NO_DELETES,
-context.schemaAsString(),
-context.specAsString(),
-context.residuals());
-
-  default:
-throw new IllegalArgumentException("Unexpected entry status: " 
+ entry.status());
-}
-  });
+  CloseableIterable tasks =
+  CloseableIterable.transform(
+  entries,
+  entry -> {
+long entrySnapshotId = entry.snapshotId();
+DataFile dataFile = 
entry.file().copy(context.shouldKeepStats());
+DeleteFile[] addedDeleteFiles = 
filterAdded(context.deletes().forEntry(entry));

Review Comment:
   You are right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubsc

Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-12 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1757706938


##
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##
@@ -133,51 +131,149 @@ private static Map 
computeSnapshotOrdinals(Deque snapsh
 return snapshotOrdinals;
   }
 
-  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction {
-private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
+  private Map computeDeleteFileToSnapshotOrdinal(
+  Deque snapshots, Map snapshotOrdinals) {
+Map deleteFileToSnapshotOrdinal = Maps.newHashMap();
+
+for (Snapshot snapshot : snapshots) {
+  Iterable deleteFiles = 
snapshot.addedDeleteFiles(table().io());
+  for (DeleteFile deleteFile : deleteFiles) {
+deleteFileToSnapshotOrdinal.put(
+deleteFile.path().toString(), 
snapshotOrdinals.get(snapshot.snapshotId()));
+  }
+}
+
+return deleteFileToSnapshotOrdinal;
+  }
+
+  private static class DummyChangelogScanTask implements ChangelogScanTask {
+public static final DummyChangelogScanTask INSTANCE = new 
DummyChangelogScanTask();
 
+private DummyChangelogScanTask() {}
+
+@Override
+public ChangelogOperation operation() {
+  return ChangelogOperation.DELETE;
+}
+
+@Override
+public int changeOrdinal() {
+  return 0;
+}
+
+@Override
+public long commitSnapshotId() {
+  return 0L;
+}
+  }
+
+  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction {
+private final long snapshotId;
+private final int changeOrdinal;
 private final Map snapshotOrdinals;
+private final Map deleteFileToSnapshotOrdinal;
+
+CreateDataFileChangeTasks(
+long snapshotId,
+Map snapshotOrdinals,
+Map deleteFileToSnapshotOrdinal) {
+  this.snapshotId = snapshotId;
+  this.snapshotOrdinals = snapshotOrdinals;
+  this.deleteFileToSnapshotOrdinal = deleteFileToSnapshotOrdinal;
+  this.changeOrdinal = this.snapshotOrdinals.get(snapshotId);
+}
+
+private DeleteFile[] filterAdded(DeleteFile[] deleteFiles) {
+  return FluentIterable.from(deleteFiles)
+  .filter(
+  deleteFile ->
+  
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) == changeOrdinal)
+  .toArray(DeleteFile.class);
+}
 
-CreateDataFileChangeTasks(Deque snapshots) {
-  this.snapshotOrdinals = computeSnapshotOrdinals(snapshots);
+private DeleteFile[] filterExisting(DeleteFile[] deleteFiles) {
+  return FluentIterable.from(deleteFiles)
+  .filter(
+  deleteFile ->
+  
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) < changeOrdinal)

Review Comment:
   I'm not sure I understand the suggestion. Perhaps you can elaborate?
   I thought the code here is quite clear and the intention is apparent from 
the method name. Thus `filterAdded` filters for delete files whose snapshot 
ordinal is the same as (`==`) `changeOrdinal`, while `filterExisting` filters 
for delete files whose snapshot ordinal comes before (`<`) `changeOrdinal`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-12 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1757701740


##
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java:
##
@@ -112,13 +149,62 @@ private CloseableIterable 
openChangelogScanTask(ChangelogScanTask t
   CloseableIterable openAddedRowsScanTask(AddedRowsScanTask task) 
{
 String filePath = task.file().path().toString();
 SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, 
task.deletes(), counter());
-return deletes.filter(rows(task, deletes.requiredSchema()));
+int[] indexes = indexesInRow(deletes.requiredSchema());
+
+return CloseableIterable.transform(
+deletes.filter(rows(task, deletes.requiredSchema())),
+row -> {
+  InternalRow expectedRow = new GenericInternalRow(columns.length);
+
+  for (int i = 0; i < columns.length; i++) {
+expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i]));
+  }
+
+  return expectedRow;
+});

Review Comment:
   I'm adding a method `projectRow`, so that the transform can be written as 
`row -> projectRow(indexes)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-12 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1757688389


##
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java:
##
@@ -112,13 +149,62 @@ private CloseableIterable 
openChangelogScanTask(ChangelogScanTask t
   CloseableIterable openAddedRowsScanTask(AddedRowsScanTask task) 
{
 String filePath = task.file().path().toString();
 SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, 
task.deletes(), counter());
-return deletes.filter(rows(task, deletes.requiredSchema()));
+int[] indexes = indexesInRow(deletes.requiredSchema());
+
+return CloseableIterable.transform(
+deletes.filter(rows(task, deletes.requiredSchema())),
+row -> {
+  InternalRow expectedRow = new GenericInternalRow(columns.length);
+
+  for (int i = 0; i < columns.length; i++) {
+expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i]));
+  }
+
+  return expectedRow;
+});

Review Comment:
   The code looks the same, but it is not the same transform as it is a 
closure, and `indexes` in each closure is different, as `indexes` is a function 
of the `SparkDeleteFilter` in each case and that is different.
   I didn't try too hard to avoid the code repetition here, as I didn't feel it 
was worth the effort, but let me think a bit more.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-12 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1757688389


##
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java:
##
@@ -112,13 +149,62 @@ private CloseableIterable 
openChangelogScanTask(ChangelogScanTask t
   CloseableIterable openAddedRowsScanTask(AddedRowsScanTask task) 
{
 String filePath = task.file().path().toString();
 SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, 
task.deletes(), counter());
-return deletes.filter(rows(task, deletes.requiredSchema()));
+int[] indexes = indexesInRow(deletes.requiredSchema());
+
+return CloseableIterable.transform(
+deletes.filter(rows(task, deletes.requiredSchema())),
+row -> {
+  InternalRow expectedRow = new GenericInternalRow(columns.length);
+
+  for (int i = 0; i < columns.length; i++) {
+expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i]));
+  }
+
+  return expectedRow;
+});

Review Comment:
   The code looks the same, but it is not the same transform as it is a 
closure, and `indexes` in the each closure is different, as `indexes` is a 
function of the `SparkDeleteFilter` in each case and that is different.
   I didn't try too hard to avoid the code repetition here, as I didn't feel it 
was worth the effort, but let me think a bit more.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-12 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1757688389


##
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java:
##
@@ -112,13 +149,62 @@ private CloseableIterable 
openChangelogScanTask(ChangelogScanTask t
   CloseableIterable openAddedRowsScanTask(AddedRowsScanTask task) 
{
 String filePath = task.file().path().toString();
 SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, 
task.deletes(), counter());
-return deletes.filter(rows(task, deletes.requiredSchema()));
+int[] indexes = indexesInRow(deletes.requiredSchema());
+
+return CloseableIterable.transform(
+deletes.filter(rows(task, deletes.requiredSchema())),
+row -> {
+  InternalRow expectedRow = new GenericInternalRow(columns.length);
+
+  for (int i = 0; i < columns.length; i++) {
+expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i]));
+  }
+
+  return expectedRow;
+});

Review Comment:
   The code looks the same, but it is not the same transform as it is a 
closure, and `indexes` in the each closure is different, as `indexes` is a 
function of the `SparkDeleteFilter` in each case and that is different.
   I didn't try too hard to avoid the code repetition here, as I didn't feel it 
was worth the effort.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-12 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1757679042


##
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java:
##
@@ -191,39 +214,359 @@ public void testMixDeleteAndInsert() throws IOException {
 table.newAppend().appendFile(dataFile2).commit();
 long snapshotId3 = table.currentSnapshot().snapshotId();
 
-CloseableIterable> taskGroups = 
newScan().planTasks();
+List rows = getChangelogRows(table);
+
+List expectedRows = Lists.newArrayList();
+addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, 
records1);
+addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId2, 1, 
records1);
+addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId3, 2, 
records2);
+
+assertEquals("Should have expected rows", expectedRows, 
internalRowsToJava(rows));
+  }
+
+  @Test
+  public void testPositionDeletes() throws IOException {
+table.newAppend().appendFile(dataFile1).commit();
+long snapshotId1 = table.currentSnapshot().snapshotId();
+
+table.newAppend().appendFile(dataFile2).commit();
+long snapshotId2 = table.currentSnapshot().snapshotId();
+
+List> deletes =
+Lists.newArrayList(
+Pair.of(dataFile1.path(), 0L), // id = 29
+Pair.of(dataFile1.path(), 3L), // id = 89
+Pair.of(dataFile2.path(), 2L) // id = 122
+);
+
+Pair posDeletes =
+FileHelpers.writeDeleteFile(
+table,
+Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+TestHelpers.Row.of(0),
+deletes);
+
+table
+.newRowDelta()
+.addDeletes(posDeletes.first())
+.validateDataFilesExist(posDeletes.second())
+.commit();
+long snapshotId3 = table.currentSnapshot().snapshotId();
+
+List rows = getChangelogRows(table);
+
+List expectedRows = Lists.newArrayList();
+addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, 
records1);
+addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, 
records2);
+addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId3, 2, 
records3);
+
+assertEquals("Should have expected rows", expectedRows, 
internalRowsToJava(rows));
+  }
+
+  @Test
+  public void testEqualityDeletes() throws IOException {
+table.newAppend().appendFile(dataFile1).commit();
+long snapshotId1 = table.currentSnapshot().snapshotId();
+
+table.newAppend().appendFile(dataFile2).commit();
+long snapshotId2 = table.currentSnapshot().snapshotId();
+
+Schema deleteRowSchema = table.schema().select("data");
+Record dataDelete = GenericRecord.create(deleteRowSchema);
+List dataDeletes =
+Lists.newArrayList(
+dataDelete.copy("data", "a"), // id = 29
+dataDelete.copy("data", "d"), // id = 89
+dataDelete.copy("data", "g") // id = 122
+);
+
+DeleteFile eqDeletes =
+FileHelpers.writeDeleteFile(
+table,
+Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+TestHelpers.Row.of(0),
+dataDeletes,
+deleteRowSchema);
+
+table.newRowDelta().addDeletes(eqDeletes).commit();
+long snapshotId3 = table.currentSnapshot().snapshotId();
+
+List rows = getChangelogRows(table);
+
+List expectedRows = Lists.newArrayList();
+addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, 
records1);
+addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, 
records2);
+addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId3, 2, 
records3);
+
+assertEquals("Should have expected rows", expectedRows, 
internalRowsToJava(rows));
+  }
+
+  @Test
+  public void testMixOfPositionAndEqualityDeletes() throws IOException {
+table.newAppend().appendFile(dataFile1).commit();
+long snapshotId1 = table.currentSnapshot().snapshotId();
+
+table.newAppend().appendFile(dataFile2).commit();
+long snapshotId2 = table.currentSnapshot().snapshotId();
+
+List> deletes =
+Lists.newArrayList(
+Pair.of(dataFile1.path(), 0L), // id = 29
+Pair.of(dataFile1.path(), 3L) // id = 89
+);
+
+Pair posDeletes =
+FileHelpers.writeDeleteFile(
+table,
+Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+TestHelpers.Row.of(0),
+deletes);
+
+Schema deleteRowSchema = table.schema().select("data");
+Record dataDelete = GenericRecord.create(deleteRowSchema);
+List dataDeletes =
+Lists.newArrayList(
+dataDelete.copy("data", "a"), // id = 29
+dataDelete.copy("data", "g") // id = 122
+);
+
+DeleteFile eqDeletes =
+FileHelpers.writeDeleteFile(
+

Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-12 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1757391061


##
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##
@@ -63,33 +60,43 @@ protected CloseableIterable doPlanFiles(
   return CloseableIterable.empty();
 }
 
-Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+Map snapshotOrdinals = 
computeSnapshotOrdinals(changelogSnapshots);
 
-Set newDataManifests =
-FluentIterable.from(changelogSnapshots)
-.transformAndConcat(snapshot -> 
snapshot.dataManifests(table().io()))
-.filter(manifest -> 
changelogSnapshotIds.contains(manifest.snapshotId()))
-.toSet();
-
-ManifestGroup manifestGroup =
-new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
-.specsById(table().specs())
-.caseSensitive(isCaseSensitive())
-.select(scanColumns())
-.filterData(filter())
-.filterManifestEntries(entry -> 
changelogSnapshotIds.contains(entry.snapshotId()))
-.ignoreExisting()
-.columnsToKeepStats(columnsToKeepStats());
-
-if (shouldIgnoreResiduals()) {
-  manifestGroup = manifestGroup.ignoreResiduals();
-}
-
-if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
-  manifestGroup = manifestGroup.planWith(planExecutor());
-}
+// map of delete file to the snapshot where the delete file is added
+// the delete file is keyed by its path, and the snapshot is represented 
by the snapshot ordinal
+Map deleteFileToSnapshotOrdinal =
+computeDeleteFileToSnapshotOrdinal(changelogSnapshots, 
snapshotOrdinals);
 
-return manifestGroup.plan(new 
CreateDataFileChangeTasks(changelogSnapshots));
+Iterable> plans =
+FluentIterable.from(changelogSnapshots)
+.transform(
+snapshot -> {
+  List dataManifests = 
snapshot.dataManifests(table().io());
+  List deleteManifests = 
snapshot.deleteManifests(table().io());
+
+  ManifestGroup manifestGroup =
+  new ManifestGroup(table().io(), dataManifests, 
deleteManifests)
+  .specsById(table().specs())
+  .caseSensitive(isCaseSensitive())
+  .select(scanColumns())
+  .filterData(filter())
+  .columnsToKeepStats(columnsToKeepStats());
+
+  if (shouldIgnoreResiduals()) {
+manifestGroup = manifestGroup.ignoreResiduals();
+  }
+
+  if (dataManifests.size() > 1 && shouldPlanWithExecutor()) {

Review Comment:
   The parallelism in `ManifestGroup::plan` is at the level of data manifest 
files, i.e., the unit of work is processing a data manifest file. So asking for 
planning with an executor service if there are many equality delete files does 
not help if there is only one data manifest file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-11 Thread via GitHub


dramaticlly commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1755259576


##
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##
@@ -63,33 +60,43 @@ protected CloseableIterable doPlanFiles(
   return CloseableIterable.empty();
 }
 
-Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+Map snapshotOrdinals = 
computeSnapshotOrdinals(changelogSnapshots);
 
-Set newDataManifests =
-FluentIterable.from(changelogSnapshots)
-.transformAndConcat(snapshot -> 
snapshot.dataManifests(table().io()))
-.filter(manifest -> 
changelogSnapshotIds.contains(manifest.snapshotId()))
-.toSet();
-
-ManifestGroup manifestGroup =
-new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
-.specsById(table().specs())
-.caseSensitive(isCaseSensitive())
-.select(scanColumns())
-.filterData(filter())
-.filterManifestEntries(entry -> 
changelogSnapshotIds.contains(entry.snapshotId()))
-.ignoreExisting()
-.columnsToKeepStats(columnsToKeepStats());
-
-if (shouldIgnoreResiduals()) {
-  manifestGroup = manifestGroup.ignoreResiduals();
-}
-
-if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
-  manifestGroup = manifestGroup.planWith(planExecutor());
-}
+// map of delete file to the snapshot where the delete file is added
+// the delete file is keyed by its path, and the snapshot is represented 
by the snapshot ordinal
+Map deleteFileToSnapshotOrdinal =
+computeDeleteFileToSnapshotOrdinal(changelogSnapshots, 
snapshotOrdinals);

Review Comment:
   yeah I agree the assessment



##
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java:
##
@@ -191,39 +214,359 @@ public void testMixDeleteAndInsert() throws IOException {
 table.newAppend().appendFile(dataFile2).commit();
 long snapshotId3 = table.currentSnapshot().snapshotId();
 
-CloseableIterable> taskGroups = 
newScan().planTasks();
+List rows = getChangelogRows(table);
+
+List expectedRows = Lists.newArrayList();
+addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, 
records1);
+addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId2, 1, 
records1);
+addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId3, 2, 
records2);
+
+assertEquals("Should have expected rows", expectedRows, 
internalRowsToJava(rows));
+  }
+
+  @Test
+  public void testPositionDeletes() throws IOException {
+table.newAppend().appendFile(dataFile1).commit();
+long snapshotId1 = table.currentSnapshot().snapshotId();
+
+table.newAppend().appendFile(dataFile2).commit();
+long snapshotId2 = table.currentSnapshot().snapshotId();
+
+List> deletes =
+Lists.newArrayList(
+Pair.of(dataFile1.path(), 0L), // id = 29
+Pair.of(dataFile1.path(), 3L), // id = 89
+Pair.of(dataFile2.path(), 2L) // id = 122
+);
+
+Pair posDeletes =
+FileHelpers.writeDeleteFile(
+table,
+Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+TestHelpers.Row.of(0),
+deletes);
+
+table
+.newRowDelta()
+.addDeletes(posDeletes.first())
+.validateDataFilesExist(posDeletes.second())
+.commit();
+long snapshotId3 = table.currentSnapshot().snapshotId();
+
+List rows = getChangelogRows(table);
+
+List expectedRows = Lists.newArrayList();
+addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, 
records1);
+addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, 
records2);
+addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId3, 2, 
records3);
+
+assertEquals("Should have expected rows", expectedRows, 
internalRowsToJava(rows));
+  }
+
+  @Test
+  public void testEqualityDeletes() throws IOException {
+table.newAppend().appendFile(dataFile1).commit();
+long snapshotId1 = table.currentSnapshot().snapshotId();
+
+table.newAppend().appendFile(dataFile2).commit();
+long snapshotId2 = table.currentSnapshot().snapshotId();
+
+Schema deleteRowSchema = table.schema().select("data");
+Record dataDelete = GenericRecord.create(deleteRowSchema);
+List dataDeletes =
+Lists.newArrayList(
+dataDelete.copy("data", "a"), // id = 29
+dataDelete.copy("data", "d"), // id = 89
+dataDelete.copy("data", "g") // id = 122
+);
+
+DeleteFile eqDeletes =
+FileHelpers.writeDeleteFile(
+table,
+Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+TestHelpers.Row.of(0),
+dataDeletes,
+deleteRowSchema);
+
+table

Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-10 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1753070517


##
gradle.properties:
##
@@ -32,4 +32,4 @@ org.gradle.parallel=true
 org.gradle.configureondemand=true
 # explicitly disable the configuration cache
 org.gradle.configuration-cache=false
-org.gradle.jvmargs=-Xmx1024m
+org.gradle.jvmargs=-Xmx2048m

Review Comment:
   I reverted the gradle change and the CI passed. It appears to be an 
inconsistent issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-06 Thread via GitHub


wypoon commented on PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2335008435

   I did not have time to come back to this work this week as I hoped, but I 
think I should have some time next week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-06 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1747872923


##
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##
@@ -63,33 +60,43 @@ protected CloseableIterable doPlanFiles(
   return CloseableIterable.empty();
 }
 
-Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+Map snapshotOrdinals = 
computeSnapshotOrdinals(changelogSnapshots);
 
-Set newDataManifests =
-FluentIterable.from(changelogSnapshots)
-.transformAndConcat(snapshot -> 
snapshot.dataManifests(table().io()))
-.filter(manifest -> 
changelogSnapshotIds.contains(manifest.snapshotId()))
-.toSet();
-
-ManifestGroup manifestGroup =
-new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
-.specsById(table().specs())
-.caseSensitive(isCaseSensitive())
-.select(scanColumns())
-.filterData(filter())
-.filterManifestEntries(entry -> 
changelogSnapshotIds.contains(entry.snapshotId()))
-.ignoreExisting()
-.columnsToKeepStats(columnsToKeepStats());
-
-if (shouldIgnoreResiduals()) {
-  manifestGroup = manifestGroup.ignoreResiduals();
-}
-
-if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
-  manifestGroup = manifestGroup.planWith(planExecutor());
-}
+// map of delete file to the snapshot where the delete file is added
+// the delete file is keyed by its path, and the snapshot is represented 
by the snapshot ordinal
+Map deleteFileToSnapshotOrdinal =
+computeDeleteFileToSnapshotOrdinal(changelogSnapshots, 
snapshotOrdinals);

Review Comment:
   I suspect that when the dust is settled on the V3 spec, there will be 
changes needed to the changelog implementation, and not just this part. I agree 
that we should try to keep the proposed changes in mind, but I don't think we 
should hold up implementing something for V2. We have to deal with V3 changes 
when that comes.
   Right now, the scan tasks we have to work with take `DeleteFile[]` 
parameters, as they are designed for V2. So for V2 this map makes sense for use 
in filtering the `DeleteFile`s to pass to those scan task classes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-06 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1747869556


##
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java:
##
@@ -112,13 +149,62 @@ private CloseableIterable 
openChangelogScanTask(ChangelogScanTask t
   CloseableIterable openAddedRowsScanTask(AddedRowsScanTask task) 
{
 String filePath = task.file().path().toString();
 SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, 
task.deletes(), counter());
-return deletes.filter(rows(task, deletes.requiredSchema()));
+int[] indexes = indexesInRow(deletes.requiredSchema());
+
+return CloseableIterable.transform(
+deletes.filter(rows(task, deletes.requiredSchema())),
+row -> {
+  InternalRow expectedRow = new GenericInternalRow(columns.length);
+
+  for (int i = 0; i < columns.length; i++) {
+expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i]));
+  }
+
+  return expectedRow;
+});
   }
 
   private CloseableIterable 
openDeletedDataFileScanTask(DeletedDataFileScanTask task) {
 String filePath = task.file().path().toString();
 SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, 
task.existingDeletes(), counter());
-return deletes.filter(rows(task, deletes.requiredSchema()));
+int[] indexes = indexesInRow(deletes.requiredSchema());
+
+return CloseableIterable.transform(
+deletes.filter(rows(task, deletes.requiredSchema())),
+row -> {
+  InternalRow expectedRow = new GenericInternalRow(columns.length);
+
+  for (int i = 0; i < columns.length; i++) {
+expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i]));
+  }
+
+  return expectedRow;
+});
+  }
+
+  private CloseableIterable 
openDeletedRowsScanTask(DeletedRowsScanTask task) {
+String filePath = task.file().path().toString();
+SparkDeleteFilter existingDeletes =
+new SparkDeleteFilter(filePath, task.existingDeletes(), counter());
+SparkDeleteFilter newDeletes = new SparkDeleteFilter(filePath, 
task.addedDeletes(), counter());
+Schema schema1 = existingDeletes.requiredSchema();
+Schema schema2 = newDeletes.requiredSchema();
+Schema requiredSchema = TypeUtil.join(schema1, schema2);

Review Comment:
   Sure, that is a minor thing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-09-01 Thread via GitHub


manuzhang commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1740294635


##
gradle.properties:
##
@@ -32,4 +32,4 @@ org.gradle.parallel=true
 org.gradle.configureondemand=true
 # explicitly disable the configuration cache
 org.gradle.configuration-cache=false
-org.gradle.jvmargs=-Xmx1024m
+org.gradle.jvmargs=-Xmx2048m

Review Comment:
   Did it just fail here? What's changed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-27 Thread via GitHub


wypoon commented on PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2313986585

   @pvary and @dramaticlly, thank you both for reviewing. I am busy with some 
other work at the moment, but I'll return to this by next week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-27 Thread via GitHub


dramaticlly commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1733659899


##
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java:
##
@@ -112,13 +149,62 @@ private CloseableIterable 
openChangelogScanTask(ChangelogScanTask t
   CloseableIterable openAddedRowsScanTask(AddedRowsScanTask task) 
{
 String filePath = task.file().path().toString();
 SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, 
task.deletes(), counter());
-return deletes.filter(rows(task, deletes.requiredSchema()));
+int[] indexes = indexesInRow(deletes.requiredSchema());
+
+return CloseableIterable.transform(
+deletes.filter(rows(task, deletes.requiredSchema())),
+row -> {
+  InternalRow expectedRow = new GenericInternalRow(columns.length);
+
+  for (int i = 0; i < columns.length; i++) {
+expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i]));
+  }
+
+  return expectedRow;
+});
   }
 
   private CloseableIterable 
openDeletedDataFileScanTask(DeletedDataFileScanTask task) {
 String filePath = task.file().path().toString();
 SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, 
task.existingDeletes(), counter());
-return deletes.filter(rows(task, deletes.requiredSchema()));
+int[] indexes = indexesInRow(deletes.requiredSchema());
+
+return CloseableIterable.transform(
+deletes.filter(rows(task, deletes.requiredSchema())),
+row -> {
+  InternalRow expectedRow = new GenericInternalRow(columns.length);
+
+  for (int i = 0; i < columns.length; i++) {
+expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i]));
+  }
+
+  return expectedRow;
+});
+  }
+
+  private CloseableIterable 
openDeletedRowsScanTask(DeletedRowsScanTask task) {
+String filePath = task.file().path().toString();
+SparkDeleteFilter existingDeletes =
+new SparkDeleteFilter(filePath, task.existingDeletes(), counter());
+SparkDeleteFilter newDeletes = new SparkDeleteFilter(filePath, 
task.addedDeletes(), counter());
+Schema schema1 = existingDeletes.requiredSchema();
+Schema schema2 = newDeletes.requiredSchema();
+Schema requiredSchema = TypeUtil.join(schema1, schema2);

Review Comment:
   can we inline this to 
   ```
Schema requiredSchema = TypeUtil.join(existingDeletes.requiredSchema(), 
newDeletes.requiredSchema());
   ```
   
   As no other use of schema1 and schema2 below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-27 Thread via GitHub


dramaticlly commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1733661588


##
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##
@@ -133,51 +131,149 @@ private static Map 
computeSnapshotOrdinals(Deque snapsh
 return snapshotOrdinals;
   }
 
-  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction {
-private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
+  private Map computeDeleteFileToSnapshotOrdinal(
+  Deque snapshots, Map snapshotOrdinals) {
+Map deleteFileToSnapshotOrdinal = Maps.newHashMap();
+
+for (Snapshot snapshot : snapshots) {
+  Iterable deleteFiles = 
snapshot.addedDeleteFiles(table().io());
+  for (DeleteFile deleteFile : deleteFiles) {
+deleteFileToSnapshotOrdinal.put(
+deleteFile.path().toString(), 
snapshotOrdinals.get(snapshot.snapshotId()));
+  }
+}
+
+return deleteFileToSnapshotOrdinal;
+  }
+
+  private static class DummyChangelogScanTask implements ChangelogScanTask {
+public static final DummyChangelogScanTask INSTANCE = new 
DummyChangelogScanTask();
 
+private DummyChangelogScanTask() {}
+
+@Override
+public ChangelogOperation operation() {
+  return ChangelogOperation.DELETE;
+}
+
+@Override
+public int changeOrdinal() {
+  return 0;
+}
+
+@Override
+public long commitSnapshotId() {
+  return 0L;
+}
+  }
+
+  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction {
+private final long snapshotId;
+private final int changeOrdinal;
 private final Map snapshotOrdinals;
+private final Map deleteFileToSnapshotOrdinal;
+
+CreateDataFileChangeTasks(
+long snapshotId,
+Map snapshotOrdinals,
+Map deleteFileToSnapshotOrdinal) {
+  this.snapshotId = snapshotId;
+  this.snapshotOrdinals = snapshotOrdinals;
+  this.deleteFileToSnapshotOrdinal = deleteFileToSnapshotOrdinal;
+  this.changeOrdinal = this.snapshotOrdinals.get(snapshotId);
+}
+
+private DeleteFile[] filterAdded(DeleteFile[] deleteFiles) {
+  return FluentIterable.from(deleteFiles)
+  .filter(
+  deleteFile ->
+  
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) == changeOrdinal)
+  .toArray(DeleteFile.class);
+}
 
-CreateDataFileChangeTasks(Deque snapshots) {
-  this.snapshotOrdinals = computeSnapshotOrdinals(snapshots);
+private DeleteFile[] filterExisting(DeleteFile[] deleteFiles) {
+  return FluentIterable.from(deleteFiles)
+  .filter(
+  deleteFile ->
+  
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) < changeOrdinal)

Review Comment:
   Can we use a comparator here and line 190 to highlight the difference 
between added deletes and existing deletes? I think right now you are comparing 
the ordinal but it's a bit hard to see as the only difference is equality and 
less than.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-27 Thread via GitHub


dramaticlly commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1733677814


##
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##
@@ -63,33 +60,43 @@ protected CloseableIterable doPlanFiles(
   return CloseableIterable.empty();
 }
 
-Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+Map snapshotOrdinals = 
computeSnapshotOrdinals(changelogSnapshots);
 
-Set newDataManifests =
-FluentIterable.from(changelogSnapshots)
-.transformAndConcat(snapshot -> 
snapshot.dataManifests(table().io()))
-.filter(manifest -> 
changelogSnapshotIds.contains(manifest.snapshotId()))
-.toSet();
-
-ManifestGroup manifestGroup =
-new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
-.specsById(table().specs())
-.caseSensitive(isCaseSensitive())
-.select(scanColumns())
-.filterData(filter())
-.filterManifestEntries(entry -> 
changelogSnapshotIds.contains(entry.snapshotId()))
-.ignoreExisting()
-.columnsToKeepStats(columnsToKeepStats());
-
-if (shouldIgnoreResiduals()) {
-  manifestGroup = manifestGroup.ignoreResiduals();
-}
-
-if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
-  manifestGroup = manifestGroup.planWith(planExecutor());
-}
+// map of delete file to the snapshot where the delete file is added
+// the delete file is keyed by its path, and the snapshot is represented 
by the snapshot ordinal
+Map deleteFileToSnapshotOrdinal =
+computeDeleteFileToSnapshotOrdinal(changelogSnapshots, 
snapshotOrdinals);

Review Comment:
   I think this might work for V2 but I think @aokolnychyi  recently proposed 
[V3 improvement for position 
deletes](https://docs.google.com/document/d/18Bqhr-vnzFfQk1S4AgRISkA_5_m5m32Nnc2Cw0zn2XM)
 to leverage puffin files and such that many delete files will have same path 
but different offsets, so key by path or path based equality might not be the 
best.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-27 Thread via GitHub


dramaticlly commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1733661588


##
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##
@@ -133,51 +131,149 @@ private static Map 
computeSnapshotOrdinals(Deque snapsh
 return snapshotOrdinals;
   }
 
-  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction {
-private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
+  private Map computeDeleteFileToSnapshotOrdinal(
+  Deque snapshots, Map snapshotOrdinals) {
+Map deleteFileToSnapshotOrdinal = Maps.newHashMap();
+
+for (Snapshot snapshot : snapshots) {
+  Iterable deleteFiles = 
snapshot.addedDeleteFiles(table().io());
+  for (DeleteFile deleteFile : deleteFiles) {
+deleteFileToSnapshotOrdinal.put(
+deleteFile.path().toString(), 
snapshotOrdinals.get(snapshot.snapshotId()));
+  }
+}
+
+return deleteFileToSnapshotOrdinal;
+  }
+
+  private static class DummyChangelogScanTask implements ChangelogScanTask {
+public static final DummyChangelogScanTask INSTANCE = new 
DummyChangelogScanTask();
 
+private DummyChangelogScanTask() {}
+
+@Override
+public ChangelogOperation operation() {
+  return ChangelogOperation.DELETE;
+}
+
+@Override
+public int changeOrdinal() {
+  return 0;
+}
+
+@Override
+public long commitSnapshotId() {
+  return 0L;
+}
+  }
+
+  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction {
+private final long snapshotId;
+private final int changeOrdinal;
 private final Map snapshotOrdinals;
+private final Map deleteFileToSnapshotOrdinal;
+
+CreateDataFileChangeTasks(
+long snapshotId,
+Map snapshotOrdinals,
+Map deleteFileToSnapshotOrdinal) {
+  this.snapshotId = snapshotId;
+  this.snapshotOrdinals = snapshotOrdinals;
+  this.deleteFileToSnapshotOrdinal = deleteFileToSnapshotOrdinal;
+  this.changeOrdinal = this.snapshotOrdinals.get(snapshotId);
+}
+
+private DeleteFile[] filterAdded(DeleteFile[] deleteFiles) {
+  return FluentIterable.from(deleteFiles)
+  .filter(
+  deleteFile ->
+  
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) == changeOrdinal)
+  .toArray(DeleteFile.class);
+}
 
-CreateDataFileChangeTasks(Deque snapshots) {
-  this.snapshotOrdinals = computeSnapshotOrdinals(snapshots);
+private DeleteFile[] filterExisting(DeleteFile[] deleteFiles) {
+  return FluentIterable.from(deleteFiles)
+  .filter(
+  deleteFile ->
+  
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) < changeOrdinal)

Review Comment:
   Can we use a comparator here and line 190 to highlight the difference 
between added deletes and existing deletes? I think you can compare it with the 
snapshot ordinal but it's a bit hard to see as the only difference is equality 
and less than.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-27 Thread via GitHub


dramaticlly commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1733661339


##
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##
@@ -133,51 +131,149 @@ private static Map 
computeSnapshotOrdinals(Deque snapsh
 return snapshotOrdinals;
   }
 
-  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction {
-private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
+  private Map computeDeleteFileToSnapshotOrdinal(
+  Deque snapshots, Map snapshotOrdinals) {
+Map deleteFileToSnapshotOrdinal = Maps.newHashMap();
+
+for (Snapshot snapshot : snapshots) {
+  Iterable deleteFiles = 
snapshot.addedDeleteFiles(table().io());
+  for (DeleteFile deleteFile : deleteFiles) {
+deleteFileToSnapshotOrdinal.put(
+deleteFile.path().toString(), 
snapshotOrdinals.get(snapshot.snapshotId()));
+  }
+}
+
+return deleteFileToSnapshotOrdinal;
+  }
+
+  private static class DummyChangelogScanTask implements ChangelogScanTask {
+public static final DummyChangelogScanTask INSTANCE = new 
DummyChangelogScanTask();
 
+private DummyChangelogScanTask() {}
+
+@Override
+public ChangelogOperation operation() {
+  return ChangelogOperation.DELETE;
+}
+
+@Override
+public int changeOrdinal() {
+  return 0;
+}
+
+@Override
+public long commitSnapshotId() {
+  return 0L;
+}
+  }
+
+  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction {
+private final long snapshotId;
+private final int changeOrdinal;
 private final Map snapshotOrdinals;
+private final Map deleteFileToSnapshotOrdinal;
+
+CreateDataFileChangeTasks(
+long snapshotId,
+Map snapshotOrdinals,
+Map deleteFileToSnapshotOrdinal) {
+  this.snapshotId = snapshotId;
+  this.snapshotOrdinals = snapshotOrdinals;
+  this.deleteFileToSnapshotOrdinal = deleteFileToSnapshotOrdinal;
+  this.changeOrdinal = this.snapshotOrdinals.get(snapshotId);
+}
+
+private DeleteFile[] filterAdded(DeleteFile[] deleteFiles) {
+  return FluentIterable.from(deleteFiles)
+  .filter(
+  deleteFile ->
+  
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) == changeOrdinal)
+  .toArray(DeleteFile.class);
+}
 
-CreateDataFileChangeTasks(Deque snapshots) {
-  this.snapshotOrdinals = computeSnapshotOrdinals(snapshots);
+private DeleteFile[] filterExisting(DeleteFile[] deleteFiles) {
+  return FluentIterable.from(deleteFiles)
+  .filter(
+  deleteFile ->
+  
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) < changeOrdinal)
+  .toArray(DeleteFile.class);
 }

Review Comment:
   Can we use a comparator here to highlight the difference between added 
deletes and existing deletes? I think you can compare it with the snapshot 
ordinal but it's a bit hard to see as the only difference is equality and less 
than. 



##
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##
@@ -133,51 +131,149 @@ private static Map 
computeSnapshotOrdinals(Deque snapsh
 return snapshotOrdinals;
   }
 
-  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction {
-private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
+  private Map computeDeleteFileToSnapshotOrdinal(
+  Deque snapshots, Map snapshotOrdinals) {
+Map deleteFileToSnapshotOrdinal = Maps.newHashMap();
+
+for (Snapshot snapshot : snapshots) {
+  Iterable deleteFiles = 
snapshot.addedDeleteFiles(table().io());
+  for (DeleteFile deleteFile : deleteFiles) {
+deleteFileToSnapshotOrdinal.put(
+deleteFile.path().toString(), 
snapshotOrdinals.get(snapshot.snapshotId()));
+  }
+}
+
+return deleteFileToSnapshotOrdinal;
+  }
+
+  private static class DummyChangelogScanTask implements ChangelogScanTask {
+public static final DummyChangelogScanTask INSTANCE = new 
DummyChangelogScanTask();
 
+private DummyChangelogScanTask() {}
+
+@Override
+public ChangelogOperation operation() {
+  return ChangelogOperation.DELETE;
+}
+
+@Override
+public int changeOrdinal() {
+  return 0;
+}
+
+@Override
+public long commitSnapshotId() {
+  return 0L;
+}
+  }
+
+  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction {
+private final long snapshotId;
+private final int changeOrdinal;
 private final Map snapshotOrdinals;
+private final Map deleteFileToSnapshotOrdinal;
+
+CreateDataFileChangeTasks(
+long snapshotId,
+Map snapshotOrdinals,
+Map deleteFileToSnapshotOrdinal) {
+  this.snapshotId = snapshotId;
+  this.snapshotOrdinals = snapshotOrdinals;
+  this.deleteFileToSnapshotOrdinal = dele

Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-27 Thread via GitHub


dramaticlly commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1733659899


##
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java:
##
@@ -112,13 +149,62 @@ private CloseableIterable 
openChangelogScanTask(ChangelogScanTask t
   CloseableIterable openAddedRowsScanTask(AddedRowsScanTask task) 
{
 String filePath = task.file().path().toString();
 SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, 
task.deletes(), counter());
-return deletes.filter(rows(task, deletes.requiredSchema()));
+int[] indexes = indexesInRow(deletes.requiredSchema());
+
+return CloseableIterable.transform(
+deletes.filter(rows(task, deletes.requiredSchema())),
+row -> {
+  InternalRow expectedRow = new GenericInternalRow(columns.length);
+
+  for (int i = 0; i < columns.length; i++) {
+expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i]));
+  }
+
+  return expectedRow;
+});
   }
 
   private CloseableIterable 
openDeletedDataFileScanTask(DeletedDataFileScanTask task) {
 String filePath = task.file().path().toString();
 SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, 
task.existingDeletes(), counter());
-return deletes.filter(rows(task, deletes.requiredSchema()));
+int[] indexes = indexesInRow(deletes.requiredSchema());
+
+return CloseableIterable.transform(
+deletes.filter(rows(task, deletes.requiredSchema())),
+row -> {
+  InternalRow expectedRow = new GenericInternalRow(columns.length);
+
+  for (int i = 0; i < columns.length; i++) {
+expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i]));
+  }
+
+  return expectedRow;
+});
+  }
+
+  private CloseableIterable 
openDeletedRowsScanTask(DeletedRowsScanTask task) {
+String filePath = task.file().path().toString();
+SparkDeleteFilter existingDeletes =
+new SparkDeleteFilter(filePath, task.existingDeletes(), counter());
+SparkDeleteFilter newDeletes = new SparkDeleteFilter(filePath, 
task.addedDeletes(), counter());
+Schema schema1 = existingDeletes.requiredSchema();
+Schema schema2 = newDeletes.requiredSchema();
+Schema requiredSchema = TypeUtil.join(schema1, schema2);

Review Comment:
   can we inline this to 
   ```
Schema requiredSchema = TypeUtil.join(existingDeletes.requiredSchema(), 
newDeletes.requiredSchema());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-27 Thread via GitHub


dramaticlly commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1733658370


##
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java:
##
@@ -112,13 +149,62 @@ private CloseableIterable 
openChangelogScanTask(ChangelogScanTask t
   CloseableIterable openAddedRowsScanTask(AddedRowsScanTask task) 
{
 String filePath = task.file().path().toString();
 SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, 
task.deletes(), counter());
-return deletes.filter(rows(task, deletes.requiredSchema()));
+int[] indexes = indexesInRow(deletes.requiredSchema());
+
+return CloseableIterable.transform(
+deletes.filter(rows(task, deletes.requiredSchema())),
+row -> {
+  InternalRow expectedRow = new GenericInternalRow(columns.length);
+
+  for (int i = 0; i < columns.length; i++) {
+expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i]));
+  }
+
+  return expectedRow;
+});

Review Comment:
   Speak of which, i saw this update to `InternalRow` has been repeated here 
and below for deletedDataFileScanTask, do you know if we can put it into a 
method instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-27 Thread via GitHub


dramaticlly commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1733657338


##
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java:
##
@@ -112,13 +149,62 @@ private CloseableIterable 
openChangelogScanTask(ChangelogScanTask t
   CloseableIterable openAddedRowsScanTask(AddedRowsScanTask task) 
{
 String filePath = task.file().path().toString();
 SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, 
task.deletes(), counter());
-return deletes.filter(rows(task, deletes.requiredSchema()));
+int[] indexes = indexesInRow(deletes.requiredSchema());
+
+return CloseableIterable.transform(
+deletes.filter(rows(task, deletes.requiredSchema())),
+row -> {
+  InternalRow expectedRow = new GenericInternalRow(columns.length);
+
+  for (int i = 0; i < columns.length; i++) {
+expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i]));
+  }
+
+  return expectedRow;
+});

Review Comment:
   yeah I realized this as well when reading from RowDataReader, looks like if 
there's position delete present it will always add `pos` metadata column as 
required in 
https://github.com/apache/iceberg/blob/684f7a767c2c216a402b60b73d2d55ef605921a0/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java#L260-L263,
 as well as equality delete id if equality delete are present.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-22 Thread via GitHub


pvary commented on PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2305694139

   Could we add some tests for the `BaseIncrementalChangelogScan` directly?
   It would be good if we don't depend on Spark to test core functionality.
   Thanks, Peter 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-22 Thread via GitHub


pvary commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1727847483


##
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##
@@ -133,51 +131,149 @@ private static Map 
computeSnapshotOrdinals(Deque snapsh
 return snapshotOrdinals;
   }
 
-  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction {
-private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
+  private Map computeDeleteFileToSnapshotOrdinal(
+  Deque snapshots, Map snapshotOrdinals) {
+Map deleteFileToSnapshotOrdinal = Maps.newHashMap();
+
+for (Snapshot snapshot : snapshots) {
+  Iterable deleteFiles = 
snapshot.addedDeleteFiles(table().io());
+  for (DeleteFile deleteFile : deleteFiles) {
+deleteFileToSnapshotOrdinal.put(
+deleteFile.path().toString(), 
snapshotOrdinals.get(snapshot.snapshotId()));
+  }
+}
+
+return deleteFileToSnapshotOrdinal;
+  }
+
+  private static class DummyChangelogScanTask implements ChangelogScanTask {
+public static final DummyChangelogScanTask INSTANCE = new 
DummyChangelogScanTask();
 
+private DummyChangelogScanTask() {}
+
+@Override
+public ChangelogOperation operation() {
+  return ChangelogOperation.DELETE;
+}
+
+@Override
+public int changeOrdinal() {
+  return 0;
+}
+
+@Override
+public long commitSnapshotId() {
+  return 0L;
+}
+  }
+
+  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction {
+private final long snapshotId;
+private final int changeOrdinal;
 private final Map snapshotOrdinals;
+private final Map deleteFileToSnapshotOrdinal;
+
+CreateDataFileChangeTasks(
+long snapshotId,
+Map snapshotOrdinals,
+Map deleteFileToSnapshotOrdinal) {
+  this.snapshotId = snapshotId;
+  this.snapshotOrdinals = snapshotOrdinals;
+  this.deleteFileToSnapshotOrdinal = deleteFileToSnapshotOrdinal;
+  this.changeOrdinal = this.snapshotOrdinals.get(snapshotId);
+}
+
+private DeleteFile[] filterAdded(DeleteFile[] deleteFiles) {
+  return FluentIterable.from(deleteFiles)
+  .filter(
+  deleteFile ->
+  
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) == changeOrdinal)
+  .toArray(DeleteFile.class);
+}
 
-CreateDataFileChangeTasks(Deque snapshots) {
-  this.snapshotOrdinals = computeSnapshotOrdinals(snapshots);
+private DeleteFile[] filterExisting(DeleteFile[] deleteFiles) {
+  return FluentIterable.from(deleteFiles)
+  .filter(
+  deleteFile ->
+  
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) < changeOrdinal)
+  .toArray(DeleteFile.class);
 }
 
 @Override
 public CloseableIterable apply(
 CloseableIterable> entries, TaskContext 
context) {
 
-  return CloseableIterable.transform(
-  entries,
-  entry -> {
-long commitSnapshotId = entry.snapshotId();
-int changeOrdinal = snapshotOrdinals.get(commitSnapshotId);
-DataFile dataFile = entry.file().copy(context.shouldKeepStats());
-
-switch (entry.status()) {
-  case ADDED:
-return new BaseAddedRowsScanTask(
-changeOrdinal,
-commitSnapshotId,
-dataFile,
-NO_DELETES,
-context.schemaAsString(),
-context.specAsString(),
-context.residuals());
-
-  case DELETED:
-return new BaseDeletedDataFileScanTask(
-changeOrdinal,
-commitSnapshotId,
-dataFile,
-NO_DELETES,
-context.schemaAsString(),
-context.specAsString(),
-context.residuals());
-
-  default:
-throw new IllegalArgumentException("Unexpected entry status: " 
+ entry.status());
-}
-  });
+  CloseableIterable tasks =
+  CloseableIterable.transform(
+  entries,
+  entry -> {
+long entrySnapshotId = entry.snapshotId();
+DataFile dataFile = 
entry.file().copy(context.shouldKeepStats());
+DeleteFile[] addedDeleteFiles = 
filterAdded(context.deletes().forEntry(entry));
+
+switch (entry.status()) {
+  case ADDED:
+if (entrySnapshotId == snapshotId) {
+  return new BaseAddedRowsScanTask(
+  changeOrdinal,
+  snapshotId,
+  dataFile,
+  addedDeleteFiles,
+  context.schemaAsString(),
+  context.specAsString(),
+ 

Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-22 Thread via GitHub


pvary commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1727828125


##
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##
@@ -133,51 +131,149 @@ private static Map 
computeSnapshotOrdinals(Deque snapsh
 return snapshotOrdinals;
   }
 
-  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction {
-private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
+  private Map computeDeleteFileToSnapshotOrdinal(
+  Deque snapshots, Map snapshotOrdinals) {
+Map deleteFileToSnapshotOrdinal = Maps.newHashMap();
+
+for (Snapshot snapshot : snapshots) {
+  Iterable deleteFiles = 
snapshot.addedDeleteFiles(table().io());
+  for (DeleteFile deleteFile : deleteFiles) {
+deleteFileToSnapshotOrdinal.put(
+deleteFile.path().toString(), 
snapshotOrdinals.get(snapshot.snapshotId()));
+  }
+}
+
+return deleteFileToSnapshotOrdinal;
+  }
+
+  private static class DummyChangelogScanTask implements ChangelogScanTask {
+public static final DummyChangelogScanTask INSTANCE = new 
DummyChangelogScanTask();
 
+private DummyChangelogScanTask() {}
+
+@Override
+public ChangelogOperation operation() {
+  return ChangelogOperation.DELETE;
+}
+
+@Override
+public int changeOrdinal() {
+  return 0;
+}
+
+@Override
+public long commitSnapshotId() {
+  return 0L;
+}
+  }
+
+  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction {
+private final long snapshotId;
+private final int changeOrdinal;
 private final Map snapshotOrdinals;
+private final Map deleteFileToSnapshotOrdinal;
+
+CreateDataFileChangeTasks(
+long snapshotId,
+Map snapshotOrdinals,
+Map deleteFileToSnapshotOrdinal) {
+  this.snapshotId = snapshotId;
+  this.snapshotOrdinals = snapshotOrdinals;
+  this.deleteFileToSnapshotOrdinal = deleteFileToSnapshotOrdinal;
+  this.changeOrdinal = this.snapshotOrdinals.get(snapshotId);
+}
+
+private DeleteFile[] filterAdded(DeleteFile[] deleteFiles) {
+  return FluentIterable.from(deleteFiles)
+  .filter(
+  deleteFile ->
+  
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) == changeOrdinal)
+  .toArray(DeleteFile.class);
+}
 
-CreateDataFileChangeTasks(Deque snapshots) {
-  this.snapshotOrdinals = computeSnapshotOrdinals(snapshots);
+private DeleteFile[] filterExisting(DeleteFile[] deleteFiles) {
+  return FluentIterable.from(deleteFiles)
+  .filter(
+  deleteFile ->
+  
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) < changeOrdinal)
+  .toArray(DeleteFile.class);
 }
 
 @Override
 public CloseableIterable apply(
 CloseableIterable> entries, TaskContext 
context) {
 
-  return CloseableIterable.transform(
-  entries,
-  entry -> {
-long commitSnapshotId = entry.snapshotId();
-int changeOrdinal = snapshotOrdinals.get(commitSnapshotId);
-DataFile dataFile = entry.file().copy(context.shouldKeepStats());
-
-switch (entry.status()) {
-  case ADDED:
-return new BaseAddedRowsScanTask(
-changeOrdinal,
-commitSnapshotId,
-dataFile,
-NO_DELETES,
-context.schemaAsString(),
-context.specAsString(),
-context.residuals());
-
-  case DELETED:
-return new BaseDeletedDataFileScanTask(
-changeOrdinal,
-commitSnapshotId,
-dataFile,
-NO_DELETES,
-context.schemaAsString(),
-context.specAsString(),
-context.residuals());
-
-  default:
-throw new IllegalArgumentException("Unexpected entry status: " 
+ entry.status());
-}
-  });
+  CloseableIterable tasks =
+  CloseableIterable.transform(
+  entries,
+  entry -> {
+long entrySnapshotId = entry.snapshotId();
+DataFile dataFile = 
entry.file().copy(context.shouldKeepStats());
+DeleteFile[] addedDeleteFiles = 
filterAdded(context.deletes().forEntry(entry));

Review Comment:
   We can reuse `context.deletes().forEntry(entry)` later, it might worth to 
add a variable for it and reuse later



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---

Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-22 Thread via GitHub


pvary commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1727796525


##
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##
@@ -63,33 +60,43 @@ protected CloseableIterable doPlanFiles(
   return CloseableIterable.empty();
 }
 
-Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+Map snapshotOrdinals = 
computeSnapshotOrdinals(changelogSnapshots);
 
-Set newDataManifests =
-FluentIterable.from(changelogSnapshots)
-.transformAndConcat(snapshot -> 
snapshot.dataManifests(table().io()))
-.filter(manifest -> 
changelogSnapshotIds.contains(manifest.snapshotId()))
-.toSet();
-
-ManifestGroup manifestGroup =
-new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
-.specsById(table().specs())
-.caseSensitive(isCaseSensitive())
-.select(scanColumns())
-.filterData(filter())
-.filterManifestEntries(entry -> 
changelogSnapshotIds.contains(entry.snapshotId()))
-.ignoreExisting()
-.columnsToKeepStats(columnsToKeepStats());
-
-if (shouldIgnoreResiduals()) {
-  manifestGroup = manifestGroup.ignoreResiduals();
-}
-
-if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
-  manifestGroup = manifestGroup.planWith(planExecutor());
-}
+// map of delete file to the snapshot where the delete file is added
+// the delete file is keyed by its path, and the snapshot is represented 
by the snapshot ordinal
+Map deleteFileToSnapshotOrdinal =
+computeDeleteFileToSnapshotOrdinal(changelogSnapshots, 
snapshotOrdinals);
 
-return manifestGroup.plan(new 
CreateDataFileChangeTasks(changelogSnapshots));
+Iterable> plans =
+FluentIterable.from(changelogSnapshots)
+.transform(
+snapshot -> {
+  List dataManifests = 
snapshot.dataManifests(table().io());
+  List deleteManifests = 
snapshot.deleteManifests(table().io());
+
+  ManifestGroup manifestGroup =
+  new ManifestGroup(table().io(), dataManifests, 
deleteManifests)
+  .specsById(table().specs())
+  .caseSensitive(isCaseSensitive())
+  .select(scanColumns())
+  .filterData(filter())
+  .columnsToKeepStats(columnsToKeepStats());
+
+  if (shouldIgnoreResiduals()) {
+manifestGroup = manifestGroup.ignoreResiduals();
+  }
+
+  if (dataManifests.size() > 1 && shouldPlanWithExecutor()) {
+manifestGroup = manifestGroup.planWith(planExecutor());
+  }
+
+  long snapshotId = snapshot.snapshotId();
+  return manifestGroup.plan(
+  new CreateDataFileChangeTasks(
+  snapshotId, snapshotOrdinals, 
deleteFileToSnapshotOrdinal));

Review Comment:
   As agreed on the dev list, let's proceed with the snapshot-by-snapshot 
reader for now, and decide on the net_changes solution later



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-22 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1727409290


##
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##
@@ -63,33 +60,43 @@ protected CloseableIterable doPlanFiles(
   return CloseableIterable.empty();
 }
 
-Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+Map snapshotOrdinals = 
computeSnapshotOrdinals(changelogSnapshots);
 
-Set newDataManifests =
-FluentIterable.from(changelogSnapshots)
-.transformAndConcat(snapshot -> 
snapshot.dataManifests(table().io()))
-.filter(manifest -> 
changelogSnapshotIds.contains(manifest.snapshotId()))
-.toSet();
-
-ManifestGroup manifestGroup =
-new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
-.specsById(table().specs())
-.caseSensitive(isCaseSensitive())
-.select(scanColumns())
-.filterData(filter())
-.filterManifestEntries(entry -> 
changelogSnapshotIds.contains(entry.snapshotId()))
-.ignoreExisting()
-.columnsToKeepStats(columnsToKeepStats());
-
-if (shouldIgnoreResiduals()) {
-  manifestGroup = manifestGroup.ignoreResiduals();
-}
-
-if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
-  manifestGroup = manifestGroup.planWith(planExecutor());
-}
+// map of delete file to the snapshot where the delete file is added
+// the delete file is keyed by its path, and the snapshot is represented 
by the snapshot ordinal
+Map deleteFileToSnapshotOrdinal =
+computeDeleteFileToSnapshotOrdinal(changelogSnapshots, 
snapshotOrdinals);
 
-return manifestGroup.plan(new 
CreateDataFileChangeTasks(changelogSnapshots));
+Iterable> plans =
+FluentIterable.from(changelogSnapshots)
+.transform(
+snapshot -> {
+  List dataManifests = 
snapshot.dataManifests(table().io());
+  List deleteManifests = 
snapshot.deleteManifests(table().io());
+
+  ManifestGroup manifestGroup =
+  new ManifestGroup(table().io(), dataManifests, 
deleteManifests)
+  .specsById(table().specs())
+  .caseSensitive(isCaseSensitive())
+  .select(scanColumns())
+  .filterData(filter())
+  .columnsToKeepStats(columnsToKeepStats());
+
+  if (shouldIgnoreResiduals()) {
+manifestGroup = manifestGroup.ignoreResiduals();
+  }
+
+  if (dataManifests.size() > 1 && shouldPlanWithExecutor()) {
+manifestGroup = manifestGroup.planWith(planExecutor());
+  }
+
+  long snapshotId = snapshot.snapshotId();
+  return manifestGroup.plan(
+  new CreateDataFileChangeTasks(
+  snapshotId, snapshotOrdinals, 
deleteFileToSnapshotOrdinal));

Review Comment:
   Ah, in this case, (b) is the correct behavior. The changelog scan is an 
incremental scan over multiple snapshots in a range, and should emit the 
changes for each snapshot. This is the current behavior for the supported case, 
which is copy-on-write. What you are seeking are the net changes, which is 
functionality that is also supported by Spark, and built on top of the 
changelog scan. This uses `ChangelogIterator.removeNetCarryovers`. This 
functionality is exposed in the Spark procedure, `create_changelog_view`. (Of 
course, one can also use it programmatically.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-22 Thread via GitHub


pvary commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1726833603


##
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##
@@ -63,33 +60,43 @@ protected CloseableIterable doPlanFiles(
   return CloseableIterable.empty();
 }
 
-Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+Map snapshotOrdinals = 
computeSnapshotOrdinals(changelogSnapshots);
 
-Set newDataManifests =
-FluentIterable.from(changelogSnapshots)
-.transformAndConcat(snapshot -> 
snapshot.dataManifests(table().io()))
-.filter(manifest -> 
changelogSnapshotIds.contains(manifest.snapshotId()))
-.toSet();
-
-ManifestGroup manifestGroup =
-new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
-.specsById(table().specs())
-.caseSensitive(isCaseSensitive())
-.select(scanColumns())
-.filterData(filter())
-.filterManifestEntries(entry -> 
changelogSnapshotIds.contains(entry.snapshotId()))
-.ignoreExisting()
-.columnsToKeepStats(columnsToKeepStats());
-
-if (shouldIgnoreResiduals()) {
-  manifestGroup = manifestGroup.ignoreResiduals();
-}
-
-if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
-  manifestGroup = manifestGroup.planWith(planExecutor());
-}
+// map of delete file to the snapshot where the delete file is added
+// the delete file is keyed by its path, and the snapshot is represented 
by the snapshot ordinal
+Map deleteFileToSnapshotOrdinal =
+computeDeleteFileToSnapshotOrdinal(changelogSnapshots, 
snapshotOrdinals);
 
-return manifestGroup.plan(new 
CreateDataFileChangeTasks(changelogSnapshots));
+Iterable> plans =
+FluentIterable.from(changelogSnapshots)
+.transform(
+snapshot -> {
+  List dataManifests = 
snapshot.dataManifests(table().io());
+  List deleteManifests = 
snapshot.deleteManifests(table().io());
+
+  ManifestGroup manifestGroup =
+  new ManifestGroup(table().io(), dataManifests, 
deleteManifests)
+  .specsById(table().specs())
+  .caseSensitive(isCaseSensitive())
+  .select(scanColumns())
+  .filterData(filter())
+  .columnsToKeepStats(columnsToKeepStats());
+
+  if (shouldIgnoreResiduals()) {
+manifestGroup = manifestGroup.ignoreResiduals();
+  }
+
+  if (dataManifests.size() > 1 && shouldPlanWithExecutor()) {
+manifestGroup = manifestGroup.planWith(planExecutor());
+  }
+
+  long snapshotId = snapshot.snapshotId();
+  return manifestGroup.plan(
+  new CreateDataFileChangeTasks(
+  snapshotId, snapshotOrdinals, 
deleteFileToSnapshotOrdinal));

Review Comment:
   I'm not sure that we can filter with the current snapshotId. I think we need 
to filter with the `toSnapshotId` of the scan, so we don't emit records which 
are added and deleted when the `IncrementalScan` is for multiple snapshots.
   
   I have asked the corresponding question on the dev list thread, but for 
reference:
   
   > What is the expected behaviour when the `IncrementalScan` is created for 
not a single snapshot, but for multiple snapshots?
   > - S1 added PK1-V1
   > - S2 updated PK1-V1 to PK1-V1b (removed PK1-V1 and added PK1-V1b)
   > - S3 updated PK1-V1b to PK1-V1c (removed PK1-V1b and added PK1-V1c)
   > 
   > Let's say we have IncrementalScan.fromSnapshotInclusive(S1).toSnapshot(S3).
   > Or we need to return:
   > (a)
   > - PK1,V1c,INSERTED
   > 
   > Or is it ok, to return:
   > (b)
   > - PK1,V1,INSERTED
   > - PK1,V1,DELETED
   > - PK1,V1b,INSERTED
   > - PK1,V1b,DELETED
   > - PK1,V1c,INSERTED
   
   I think the (a) is the correct behaviour.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-22 Thread via GitHub


pvary commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1726782294


##
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##
@@ -63,33 +60,43 @@ protected CloseableIterable doPlanFiles(
   return CloseableIterable.empty();
 }
 
-Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+Map snapshotOrdinals = 
computeSnapshotOrdinals(changelogSnapshots);
 
-Set newDataManifests =
-FluentIterable.from(changelogSnapshots)
-.transformAndConcat(snapshot -> 
snapshot.dataManifests(table().io()))
-.filter(manifest -> 
changelogSnapshotIds.contains(manifest.snapshotId()))
-.toSet();
-
-ManifestGroup manifestGroup =
-new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
-.specsById(table().specs())
-.caseSensitive(isCaseSensitive())
-.select(scanColumns())
-.filterData(filter())
-.filterManifestEntries(entry -> 
changelogSnapshotIds.contains(entry.snapshotId()))
-.ignoreExisting()
-.columnsToKeepStats(columnsToKeepStats());
-
-if (shouldIgnoreResiduals()) {
-  manifestGroup = manifestGroup.ignoreResiduals();
-}
-
-if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
-  manifestGroup = manifestGroup.planWith(planExecutor());
-}
+// map of delete file to the snapshot where the delete file is added
+// the delete file is keyed by its path, and the snapshot is represented 
by the snapshot ordinal
+Map deleteFileToSnapshotOrdinal =
+computeDeleteFileToSnapshotOrdinal(changelogSnapshots, 
snapshotOrdinals);
 
-return manifestGroup.plan(new 
CreateDataFileChangeTasks(changelogSnapshots));
+Iterable> plans =
+FluentIterable.from(changelogSnapshots)
+.transform(
+snapshot -> {
+  List dataManifests = 
snapshot.dataManifests(table().io());
+  List deleteManifests = 
snapshot.deleteManifests(table().io());
+
+  ManifestGroup manifestGroup =
+  new ManifestGroup(table().io(), dataManifests, 
deleteManifests)
+  .specsById(table().specs())
+  .caseSensitive(isCaseSensitive())
+  .select(scanColumns())
+  .filterData(filter())
+  .columnsToKeepStats(columnsToKeepStats());
+
+  if (shouldIgnoreResiduals()) {
+manifestGroup = manifestGroup.ignoreResiduals();
+  }
+
+  if (dataManifests.size() > 1 && shouldPlanWithExecutor()) {

Review Comment:
   Do we want to plan with an executor if we have many equality delete files?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-21 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1726394464


##
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java:
##
@@ -191,39 +214,361 @@ public void testMixDeleteAndInsert() throws IOException {
 table.newAppend().appendFile(dataFile2).commit();
 long snapshotId3 = table.currentSnapshot().snapshotId();
 
-CloseableIterable> taskGroups = 
newScan().planTasks();
+List rows = getChangelogRows(table);
+
+List expectedRows = Lists.newArrayList();
+addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, 
records1);
+addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId2, 1, 
records1);
+addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId3, 2, 
records2);
+
+assertEquals("Should have expected rows", expectedRows, 
internalRowsToJava(rows));
+  }
+
+  @Test
+  public void testPositionDeletes() throws IOException {
+table.newAppend().appendFile(dataFile1).commit();
+long snapshotId1 = table.currentSnapshot().snapshotId();
+
+table.newAppend().appendFile(dataFile2).commit();
+long snapshotId2 = table.currentSnapshot().snapshotId();
+
+List> deletes =
+Lists.newArrayList(
+Pair.of(dataFile1.path(), 0L), // id = 29
+Pair.of(dataFile1.path(), 3L), // id = 89
+Pair.of(dataFile2.path(), 2L) // id = 122
+);
+
+Pair posDeletes =
+FileHelpers.writeDeleteFile(
+table,
+Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+TestHelpers.Row.of(0),
+deletes);
+
+table
+.newRowDelta()
+.addDeletes(posDeletes.first())
+.validateDataFilesExist(posDeletes.second())
+.commit();
+long snapshotId3 = table.currentSnapshot().snapshotId();
+
+List rows = getChangelogRows(table);
+
+List expectedRows = Lists.newArrayList();
+addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, 
records1);
+addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, 
records2);
+addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId3, 2, 
records3);
+
+assertEquals("Should have expected rows", expectedRows, 
internalRowsToJava(rows));
+  }
+
+  @Test
+  public void testEqualityDeletes() throws IOException {
+table.newAppend().appendFile(dataFile1).commit();
+long snapshotId1 = table.currentSnapshot().snapshotId();
+
+table.newAppend().appendFile(dataFile2).commit();
+long snapshotId2 = table.currentSnapshot().snapshotId();
+
+Schema deleteRowSchema = table.schema().select("data");
+Record dataDelete = GenericRecord.create(deleteRowSchema);
+List dataDeletes =
+Lists.newArrayList(
+dataDelete.copy("data", "a"), // id = 29
+dataDelete.copy("data", "d"), // id = 89
+dataDelete.copy("data", "g") // id = 122
+);
+
+DeleteFile eqDeletes =
+FileHelpers.writeDeleteFile(
+table,
+Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+TestHelpers.Row.of(0),
+dataDeletes,
+deleteRowSchema);
+
+table.newRowDelta().addDeletes(eqDeletes).commit();
+long snapshotId3 = table.currentSnapshot().snapshotId();
+
+List rows = getChangelogRows(table);
+
+List expectedRows = Lists.newArrayList();
+addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, 
records1);
+addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, 
records2);
+addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId3, 2, 
records3);
+
+assertEquals("Should have expected rows", expectedRows, 
internalRowsToJava(rows));
+  }
+
+  @Test
+  public void testMixOfPositionAndEqualityDeletes() throws IOException {
+table.newAppend().appendFile(dataFile1).commit();
+long snapshotId1 = table.currentSnapshot().snapshotId();
+
+table.newAppend().appendFile(dataFile2).commit();
+long snapshotId2 = table.currentSnapshot().snapshotId();
+
+List> deletes =
+Lists.newArrayList(
+Pair.of(dataFile1.path(), 0L), // id = 29
+Pair.of(dataFile1.path(), 3L) // id = 89
+);
+
+Pair posDeletes =
+FileHelpers.writeDeleteFile(
+table,
+Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+TestHelpers.Row.of(0),
+deletes);
+
+Schema deleteRowSchema = table.schema().select("data");
+Record dataDelete = GenericRecord.create(deleteRowSchema);
+List dataDeletes =
+Lists.newArrayList(
+dataDelete.copy("data", "a"), // id = 29
+dataDelete.copy("data", "g") // id = 122
+);
+
+DeleteFile eqDeletes =
+FileHelpers.writeDeleteFile(
+

Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-21 Thread via GitHub


wypoon commented on PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2303850176

   @pvary I have fixed the implementation so that existing deletes are applied 
before new deletes are emitted. I have fixed the test case accordingly. (I also 
renamed `testFlinkScenario1` and t`estFlinkScenario2`.) I think things work as 
you expect now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-21 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1726387609


##
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java:
##
@@ -112,13 +149,62 @@ private CloseableIterable 
openChangelogScanTask(ChangelogScanTask t
   CloseableIterable openAddedRowsScanTask(AddedRowsScanTask task) 
{
 String filePath = task.file().path().toString();
 SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, 
task.deletes(), counter());
-return deletes.filter(rows(task, deletes.requiredSchema()));
+int[] indexes = indexesInRow(deletes.requiredSchema());
+
+return CloseableIterable.transform(
+deletes.filter(rows(task, deletes.requiredSchema())),
+row -> {
+  InternalRow expectedRow = new GenericInternalRow(columns.length);
+
+  for (int i = 0; i < columns.length; i++) {
+expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i]));
+  }
+
+  return expectedRow;
+});

Review Comment:
   When `DeleteFilter::filter` is called, if there are deletes to be applied, 
the `requiredSchema` could have additional columns beyond the requested schema 
(e.g., `_pos` is added if there are positional deletes to be applied). Thus the 
`InternalRow` could have more columns than the requested schema, and we need to 
exclude the additional columns and only keep the ones for the requested schema.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-21 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1726379602


##
data/src/main/java/org/apache/iceberg/data/DeleteFilter.java:
##
@@ -197,31 +197,31 @@ record -> 
deleteSet.contains(projectRow.wrap(asStructLike(record)));
   }
 
   public CloseableIterable findEqualityDeleteRows(CloseableIterable 
records) {
-// Predicate to test whether a row has been deleted by equality deletions.
-Predicate deletedRows = 
applyEqDeletes().stream().reduce(Predicate::or).orElse(t -> false);
-
-return CloseableIterable.filter(records, deletedRows);
+return CloseableIterable.filter(records, isEqDeleted());
   }
 
   private CloseableIterable applyEqDeletes(CloseableIterable records) {
-Predicate isEqDeleted = 
applyEqDeletes().stream().reduce(Predicate::or).orElse(t -> false);
-
-return createDeleteIterable(records, isEqDeleted);
+return createDeleteIterable(records, isEqDeleted());
   }
 
   protected void markRowDeleted(T item) {
 throw new UnsupportedOperationException(
 this.getClass().getName() + " does not implement markRowDeleted");
   }
 
-  public Predicate eqDeletedRowFilter() {
+  // Predicate to test whether a row has been deleted by equality deletes
+  public Predicate isEqDeleted() {
 if (eqDeleteRows == null) {
-  eqDeleteRows =
-  
applyEqDeletes().stream().map(Predicate::negate).reduce(Predicate::and).orElse(t
 -> true);
+  eqDeleteRows = applyEqDeletes().stream().reduce(Predicate::or).orElse(t 
-> false);
 }
 return eqDeleteRows;
   }
 
+  // Predicate to test whether a row has not been deleted by equality deletes
+  public Predicate eqDeletedRowFilter() {
+return isEqDeleted().negate();
+  }

Review Comment:
   This is only used in one place, 
`ColumnarBatchReader.ColumnarBatchLoader::applyEqDelete(ColumnarBatch)`.
   I propose deprecating this and changing the call to the negation of 
`isEqDeleted()` instead. `isEqDeleted()` is more useful. I'll do that in a 
separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-21 Thread via GitHub


wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1726371909


##
gradle.properties:
##
@@ -32,4 +32,4 @@ org.gradle.parallel=true
 org.gradle.configureondemand=true
 # explicitly disable the configuration cache
 org.gradle.configuration-cache=false
-org.gradle.jvmargs=-Xmx1024m
+org.gradle.jvmargs=-Xmx2048m

Review Comment:
   Java CI failed with "Java heap space" error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-21 Thread via GitHub


wypoon commented on PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2302563661

   @pvary thanks for posting on the dev list thread!
   In your description of scenario 1, you wrote:
   
   > 
   
   > When the changelog scan reads the 3rd snapshot, it should consider:
   > 
   > Delete reads:
   > DF1 - omit records deleted by ED1, emit records deleted by ED2
   > DF2 - emit records deleted by ED2
   > Data read:
   > DF3 - emit all records
   
   This corresponds to case (b) in the thread. But now I understand that you 
must have made a mistake and did not really mean the
   > DF1 - omit records deleted by ED1, emit records deleted by ED2
   
   and think that (a) is the correct behavior.
   The current behavior is (b), but I'll work on changing the implementation to 
handle this equality delete case so that the behavior is (a).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-21 Thread via GitHub


pvary commented on PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2302465047

   > @pvary thanks for reviewing the tests! Just a quick response for now. For 
scenario 1, the behavior is as I thought you described, yet in your comment on 
the test code, you indicated that it is incorrect, so I am confused. To be 
honest, I thought about it some more yesterday and I actually think it is 
incorrect too. So I sent an email to the Iceberg dev list (please see that 
email), asking for clarification. The current behavior is case (b) in the 
email, which is what I thought you expected. I now think the behavior should be 
case (a). Please add your thoughts to the thread in the dev list too.
   
   Sorry for the confusion @wypoon!
   I have tried to describe the (a) case in my description with this sentence:
   > Notice that for DF1 we should not emit records which are deleted by 
previous deletes.
   
   Anyways, added my comments to the thread as well


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-21 Thread via GitHub


wypoon commented on PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2302439337

   @pvary thanks for reviewing the tests!
   Just a quick response for now. For scenario 1, the behavior is as I thought 
you described, yet in your comment on the test code, you indicated that it is 
incorrect, so I am confused. To be honest, I thought about it some more 
yesterday and I actually think it is incorrect too. So I sent an email to the 
Iceberg dev list (please see that email), asking for clarification.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-21 Thread via GitHub


flyrain commented on PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2302430012

   cc @dramaticlly 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-21 Thread via GitHub


pvary commented on PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2302344676

   > @pvary I have added `testFlinkScenario1` and `testFlinkScenario2` to 
`TestChangelogReader` based on your two scenarios. Please check the expected 
results. (I will rename the tests later with more descriptive names.)
   
   These are very great tests. Thanks for implementing them!
   
   > For scenario 1, I agree with you on what the changelog should emit. If DF1 
is in snapshot 3, then we should emit the row with PK1 being deleted by ED2. 
(The row is deleted by ED1 too, but we should only emit the row once, not 
twice, for snapshot 3).
   
   Do I understand correctly that this is not yet the situation with the 
current code?
   
   > For scenario 2, I think that when a row in a data file is deleted by a 
positional delete in the same commit, that row should neither be shown as 
inserted nor as deleted. This is where I think we disagree. (IIUC, you expect 
to see it as deleted but not as inserted. To me, that would be inconsistent.) 
This part of scenario 2 is actually already tested by 
`testAddingAndDeletingInSameCommit`.
   
   I think we agree here. I'm perfectly fine if we can make sure that the added 
and immediately removed records are not emitted during the incremental scan 
read.
   
   > If you agree with my analysis, then my implementation does handle at least 
your two scenarios correctly.
   
   I think the output for snapshot3 in scenario1 is not correct. I have left a 
comment there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-21 Thread via GitHub


pvary commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1725269441


##
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java:
##
@@ -191,39 +214,361 @@ public void testMixDeleteAndInsert() throws IOException {
 table.newAppend().appendFile(dataFile2).commit();
 long snapshotId3 = table.currentSnapshot().snapshotId();
 
-CloseableIterable> taskGroups = 
newScan().planTasks();
+List rows = getChangelogRows(table);
+
+List expectedRows = Lists.newArrayList();
+addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, 
records1);
+addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId2, 1, 
records1);
+addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId3, 2, 
records2);
+
+assertEquals("Should have expected rows", expectedRows, 
internalRowsToJava(rows));
+  }
+
+  @Test
+  public void testPositionDeletes() throws IOException {
+table.newAppend().appendFile(dataFile1).commit();
+long snapshotId1 = table.currentSnapshot().snapshotId();
+
+table.newAppend().appendFile(dataFile2).commit();
+long snapshotId2 = table.currentSnapshot().snapshotId();
+
+List> deletes =
+Lists.newArrayList(
+Pair.of(dataFile1.path(), 0L), // id = 29
+Pair.of(dataFile1.path(), 3L), // id = 89
+Pair.of(dataFile2.path(), 2L) // id = 122
+);
+
+Pair posDeletes =
+FileHelpers.writeDeleteFile(
+table,
+Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+TestHelpers.Row.of(0),
+deletes);
+
+table
+.newRowDelta()
+.addDeletes(posDeletes.first())
+.validateDataFilesExist(posDeletes.second())
+.commit();
+long snapshotId3 = table.currentSnapshot().snapshotId();
+
+List rows = getChangelogRows(table);
+
+List expectedRows = Lists.newArrayList();
+addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, 
records1);
+addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, 
records2);
+addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId3, 2, 
records3);
+
+assertEquals("Should have expected rows", expectedRows, 
internalRowsToJava(rows));
+  }
+
+  @Test
+  public void testEqualityDeletes() throws IOException {
+table.newAppend().appendFile(dataFile1).commit();
+long snapshotId1 = table.currentSnapshot().snapshotId();
+
+table.newAppend().appendFile(dataFile2).commit();
+long snapshotId2 = table.currentSnapshot().snapshotId();
+
+Schema deleteRowSchema = table.schema().select("data");
+Record dataDelete = GenericRecord.create(deleteRowSchema);
+List dataDeletes =
+Lists.newArrayList(
+dataDelete.copy("data", "a"), // id = 29
+dataDelete.copy("data", "d"), // id = 89
+dataDelete.copy("data", "g") // id = 122
+);
+
+DeleteFile eqDeletes =
+FileHelpers.writeDeleteFile(
+table,
+Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+TestHelpers.Row.of(0),
+dataDeletes,
+deleteRowSchema);
+
+table.newRowDelta().addDeletes(eqDeletes).commit();
+long snapshotId3 = table.currentSnapshot().snapshotId();
+
+List rows = getChangelogRows(table);
+
+List expectedRows = Lists.newArrayList();
+addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, 
records1);
+addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, 
records2);
+addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId3, 2, 
records3);
+
+assertEquals("Should have expected rows", expectedRows, 
internalRowsToJava(rows));
+  }
+
+  @Test
+  public void testMixOfPositionAndEqualityDeletes() throws IOException {
+table.newAppend().appendFile(dataFile1).commit();
+long snapshotId1 = table.currentSnapshot().snapshotId();
+
+table.newAppend().appendFile(dataFile2).commit();
+long snapshotId2 = table.currentSnapshot().snapshotId();
+
+List> deletes =
+Lists.newArrayList(
+Pair.of(dataFile1.path(), 0L), // id = 29
+Pair.of(dataFile1.path(), 3L) // id = 89
+);
+
+Pair posDeletes =
+FileHelpers.writeDeleteFile(
+table,
+Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+TestHelpers.Row.of(0),
+deletes);
+
+Schema deleteRowSchema = table.schema().select("data");
+Record dataDelete = GenericRecord.create(deleteRowSchema);
+List dataDeletes =
+Lists.newArrayList(
+dataDelete.copy("data", "a"), // id = 29
+dataDelete.copy("data", "g") // id = 122
+);
+
+DeleteFile eqDeletes =
+FileHelpers.writeDeleteFile(
+t

Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-21 Thread via GitHub


pvary commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1725175321


##
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java:
##
@@ -191,39 +214,361 @@ public void testMixDeleteAndInsert() throws IOException {
 table.newAppend().appendFile(dataFile2).commit();
 long snapshotId3 = table.currentSnapshot().snapshotId();
 
-CloseableIterable> taskGroups = 
newScan().planTasks();
+List rows = getChangelogRows(table);
+
+List expectedRows = Lists.newArrayList();
+addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, 
records1);
+addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId2, 1, 
records1);
+addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId3, 2, 
records2);
+
+assertEquals("Should have expected rows", expectedRows, 
internalRowsToJava(rows));
+  }
+
+  @Test
+  public void testPositionDeletes() throws IOException {
+table.newAppend().appendFile(dataFile1).commit();
+long snapshotId1 = table.currentSnapshot().snapshotId();
+
+table.newAppend().appendFile(dataFile2).commit();
+long snapshotId2 = table.currentSnapshot().snapshotId();
+
+List> deletes =
+Lists.newArrayList(
+Pair.of(dataFile1.path(), 0L), // id = 29
+Pair.of(dataFile1.path(), 3L), // id = 89
+Pair.of(dataFile2.path(), 2L) // id = 122
+);
+
+Pair posDeletes =
+FileHelpers.writeDeleteFile(
+table,
+Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+TestHelpers.Row.of(0),
+deletes);
+
+table
+.newRowDelta()
+.addDeletes(posDeletes.first())
+.validateDataFilesExist(posDeletes.second())
+.commit();
+long snapshotId3 = table.currentSnapshot().snapshotId();
+
+List rows = getChangelogRows(table);
+
+List expectedRows = Lists.newArrayList();
+addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, 
records1);
+addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, 
records2);
+addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId3, 2, 
records3);
+
+assertEquals("Should have expected rows", expectedRows, 
internalRowsToJava(rows));
+  }
+
+  @Test
+  public void testEqualityDeletes() throws IOException {
+table.newAppend().appendFile(dataFile1).commit();
+long snapshotId1 = table.currentSnapshot().snapshotId();
+
+table.newAppend().appendFile(dataFile2).commit();
+long snapshotId2 = table.currentSnapshot().snapshotId();
+
+Schema deleteRowSchema = table.schema().select("data");
+Record dataDelete = GenericRecord.create(deleteRowSchema);
+List dataDeletes =
+Lists.newArrayList(
+dataDelete.copy("data", "a"), // id = 29
+dataDelete.copy("data", "d"), // id = 89
+dataDelete.copy("data", "g") // id = 122
+);
+
+DeleteFile eqDeletes =
+FileHelpers.writeDeleteFile(
+table,
+Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+TestHelpers.Row.of(0),
+dataDeletes,
+deleteRowSchema);
+
+table.newRowDelta().addDeletes(eqDeletes).commit();
+long snapshotId3 = table.currentSnapshot().snapshotId();
+
+List rows = getChangelogRows(table);
+
+List expectedRows = Lists.newArrayList();
+addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, 
records1);
+addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, 
records2);
+addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId3, 2, 
records3);
+
+assertEquals("Should have expected rows", expectedRows, 
internalRowsToJava(rows));
+  }
+
+  @Test
+  public void testMixOfPositionAndEqualityDeletes() throws IOException {
+table.newAppend().appendFile(dataFile1).commit();
+long snapshotId1 = table.currentSnapshot().snapshotId();
+
+table.newAppend().appendFile(dataFile2).commit();
+long snapshotId2 = table.currentSnapshot().snapshotId();
+
+List> deletes =
+Lists.newArrayList(
+Pair.of(dataFile1.path(), 0L), // id = 29
+Pair.of(dataFile1.path(), 3L) // id = 89
+);
+
+Pair posDeletes =
+FileHelpers.writeDeleteFile(
+table,
+Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+TestHelpers.Row.of(0),
+deletes);
+
+Schema deleteRowSchema = table.schema().select("data");
+Record dataDelete = GenericRecord.create(deleteRowSchema);
+List dataDeletes =
+Lists.newArrayList(
+dataDelete.copy("data", "a"), // id = 29
+dataDelete.copy("data", "g") // id = 122
+);
+
+DeleteFile eqDeletes =
+FileHelpers.writeDeleteFile(
+t

Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-19 Thread via GitHub


wypoon commented on PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2297933397

   @pvary I have added `testFlinkScenario1` and `testFlinkScenario2` to 
`TestChangelogReader`. Please check the expected results. (I will rename the 
tests later with more descriptive names.)
   For scenario 1, I agree with you on what the changelog should emit. If DF1 
is in snapshot 3, then we should emit the row with PK1 being deleted by ED2. 
(The row is deleted by ED1 too, but we should only emit the row once, not 
twice, for snapshot 3).
   For scenario 2, I think that when a row in a data file is deleted by a 
positional delete in the same commit, that row should neither be shown as 
inserted nor as deleted. This is where I think we disagree. (IIUC, you expect 
to see it as deleted but not as inserted. To me, that would be inconsistent.) 
This part of scenario 2 is actually already tested by 
`testAddingAndDeletingInSameCommit`.
   If you agree with my analysis, then my implementation does handle at least 
your two scenarios correctly.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-17 Thread via GitHub


pvary commented on PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2295126370

   > For the positional delete case, I do not believe deleting the same 
position in the same data file again can happen
   
   I agree, that this should be the case. My only concern is, that until now, 
it didn't cause any issues. We might not thought about it and didn't clarify 
this in the specification - so if we have a hard requirement here, we might 
want to clarify it in the spec too.
   
   > Just to clarify, in your scenario1, for example, does ED1 contain PK=PK1? 
In other words, PK1 is the value of the primary key in the table, right? and by 
V1 you simply mean the values of the other columns? And then ED2 again contains 
PK=PK1? so that you can update the other columns of the same row with V2, right?
   
   Yes, undestand correctly. That's what I was trying to describe
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-17 Thread via GitHub


wypoon commented on PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2294958003

   @pvary thank you for your interest and for the Flink scenarios, which is 
very helpful as I am unfamiliar with Flink.
   
   Regarding https://github.com/apache/iceberg/pull/9888, please read my 
comments there. I put up https://github.com/apache/iceberg/pull/10954 only as a 
reference for @manuzhang so he can see the tests I added which fail with the 
implementation of `BaseIncrementalChangelogScan` in 
https://github.com/apache/iceberg/pull/9888. 
https://github.com/apache/iceberg/pull/10954 is not really for consideration or 
review.
   
   Next week I'll look into the scenarios you listed and see what gaps there 
are in my implementation and add tests as necessary. Regarding 
`DeletedRowsScanTask`, when @aokolnychyi added the API, he figured that it was 
necessary to know what deletes existed before the snapshot and what deletes are 
added in the snapshot. I'll have to think through the equality delete case, but 
for the positional delete case, I believe that it is not necessary to know the 
existing deletes (which is the reason for my `// not used` comment). For the 
positional delete case, I do not believe deleting the same position in the same 
data file again can happen, so added deletes should always be new positions. 
Thus, we only need to scan the data file and emit the rows that are deleted by 
the added deletes (which I do by using the _pos metadata column and the 
`PositionDeleteIndex` a `DeleteFilter` constructs for the data file). The _pos 
metadata column is automatically added to the schema if there are any positi
 on delete files to be applied to the data file, and for a 
`DeletedRowsScanTask`, the `DeleteFilter` is constructed with the added delete 
files (so those are the position delete files to be applied).
   I admit that I hadn't thought through equality deletes carefully. Just to 
clarify, in your scenario1, for example, does ED1 contain `PK=PK1`? In other 
words, `PK1` is the value of the primary key in the table, right? and by `V1` 
you simply mean the values of the other columns? And then ED2 again contains 
`PK=PK1`?  so that you can update the other columns of the same row with `V2`, 
right?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Support changelog scan for table with delete files [iceberg]

2024-08-17 Thread via GitHub


pvary commented on PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2294754336

   @wypoon, @manuzhang: I'm interested in providing Flink streaming CDC reads. 
That would require a working changelog scan planning in Iceberg core. So I 
would be happy to help with my reviews here. I'm not an expert of this part of 
the code yet, so if the PR becomes more complicated, we might have to ask help 
from folks more experienced here, but I can do a first round of the reviews.
   
   I see that there are multiple PRs for this topic (#9888, #10954). I would 
suggest to focus the discussion on one specific PR.
   
   There are some tricky scenarios with Flink CDC, we need to handle:
   
   ---
   
   Scenario1:
   - 1st snapshot: Writing a new record to the table primary key 1 (PK1), and 
value 1 (V1)
   - Creates data file 1  (DF1) - with PK1-V1
   - 2nd snapshot: The record is updated with PK1 with value V2
   - Creates an equality delete 1 (ED1) - with PK1
   - Creates a new data file 2 (DF2) - with PK1-V2
   - 3rd snapshot:  The record is updated with PK1 with value V3
   - Creates an equality delete 2 (ED2) - with PK1
   - Creates a new data file 3 (DF3) - with PK1-V3
   
   When the changelog scan reads the 3rd snapshot, it should consider:
   - Delete reads:
   - DF1 - omit records deleted by ED1, emit records deleted by ED2
   - DF2 - emit records deleted by ED2
   - Data read:
   - DF3 - emit all records
   
   Notice that for DF1 we should not emit records which are deleted by previous 
deletes. So I am concerned about your `// not used` comment 
[here](https://github.com/apache/iceberg/pull/10935/files#diff-24a8b4b8a16e20a7727ca9b04754fa00cbb40228aa79b3e63baf958aef7bec04R266)
 😄
   
   ---
   
   Scenario2:
   - 1st snapshot: Writing a new record to the table primary key 1 (PK1), and 
value 1 (V1)
   - Creates data file 1  (DF1) - with PK1-V1
   - 2nd snapshot: The record is updated with PK1 with value V2, but later 
updated again with PK1 with value V3
   - Creates an equality delete 1 (ED1) - with PK1
   - Creates a new data file 2 (DF2) - with PK1-V2
   - Creates a positional delete 1 (PS1) - for the DF2-PK1
   - Here we have 2 possibilities:
- Adds a new line to DF2 with PK1-V3 - if the data file target file 
size is not reached yet
- Creates a new data file 3 (DF3)  - with PK1-V3 - if the data file 
is already rolled over
   
   When changelog scan the 2nd snapshot, it should consider:
   - Delete reads:
   - DF1 - emit records deleted by ED1 - the emitted record is: D(PK1-V1)
   - DF2 - emit records deleted by PD1 - the emitted record is: D(PK1-V2)
   - Data read:
   - DF2 - omit records deleted by PS1 - the emitted record is: I(PK1-V2)
   - DF3 - emit all records - the emitted record is: I(PK1-V3)
   
   Notice that the order of the records is important, and not trivial to order 
if the files are read in the distributed way.
   
   CC: @dramaticlly 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



  1   2   >