Re: [PR] Core: Add manifest partition pruning to DV validation in MergingSnapshotProducer [iceberg]
danielcweeks merged PR #15653: URL: https://github.com/apache/iceberg/pull/15653 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [core] Add manifest partition pruning to DV validation in MergingSnapshotProducer [iceberg]
anoopj commented on code in PR #15653:
URL: https://github.com/apache/iceberg/pull/15653#discussion_r3001794037
##
core/src/test/java/org/apache/iceberg/TestRowDelta.java:
##
@@ -2187,6 +2189,133 @@ public void testDuplicateDVsAreMergedAndEqDelete()
throws IOException {
assertThat(committedEqDelete.content()).isEqualTo(FileContent.EQUALITY_DELETES);
}
+ @TestTemplate
+ public void testConcurrentDVsInDifferentPartitionsWithFilter() {
+assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+// bucket16("u") -> 0, bucket16("a") -> 2
+DataFile dataFileInBucket0 = newDataFile("data_bucket=0");
+DataFile dataFileInBucket2 = newDataFile("data_bucket=2");
+commit(
+table,
table.newRowDelta().addRows(dataFileInBucket0).addRows(dataFileInBucket2),
branch);
+
+Snapshot base = latestSnapshot(table, branch);
+
+// prepare a DV for bucket 0 with a conflict detection filter scoped to
bucket 0
+DeleteFile dvBucket0 = newDeletes(dataFileInBucket0);
+RowDelta rowDelta =
+table
+.newRowDelta()
+.addDeletes(dvBucket0)
+.validateFromSnapshot(base.snapshotId())
+.conflictDetectionFilter(Expressions.equal("data", "u")); //
bucket16("u") -> 0
+
+// concurrently commit a DV in bucket 2
+DeleteFile dvBucket2 = newDeletes(dataFileInBucket2);
+commit(table, table.newRowDelta().addDeletes(dvBucket2), branch);
+
+// commit should succeed because the concurrent DV is in bucket 2
+// which does not overlap the conflict detection filter
+commit(table, rowDelta, branch);
Review Comment:
Good idea. Done.
##
core/src/test/java/org/apache/iceberg/TestRowDelta.java:
##
@@ -2187,6 +2189,133 @@ public void testDuplicateDVsAreMergedAndEqDelete()
throws IOException {
assertThat(committedEqDelete.content()).isEqualTo(FileContent.EQUALITY_DELETES);
}
+ @TestTemplate
+ public void testConcurrentDVsInDifferentPartitionsWithFilter() {
+assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+// bucket16("u") -> 0, bucket16("a") -> 2
+DataFile dataFileInBucket0 = newDataFile("data_bucket=0");
+DataFile dataFileInBucket2 = newDataFile("data_bucket=2");
+commit(
+table,
table.newRowDelta().addRows(dataFileInBucket0).addRows(dataFileInBucket2),
branch);
+
+Snapshot base = latestSnapshot(table, branch);
+
+// prepare a DV for bucket 0 with a conflict detection filter scoped to
bucket 0
+DeleteFile dvBucket0 = newDeletes(dataFileInBucket0);
+RowDelta rowDelta =
+table
+.newRowDelta()
+.addDeletes(dvBucket0)
+.validateFromSnapshot(base.snapshotId())
+.conflictDetectionFilter(Expressions.equal("data", "u")); //
bucket16("u") -> 0
+
+// concurrently commit a DV in bucket 2
+DeleteFile dvBucket2 = newDeletes(dataFileInBucket2);
+commit(table, table.newRowDelta().addDeletes(dvBucket2), branch);
+
+// commit should succeed because the concurrent DV is in bucket 2
+// which does not overlap the conflict detection filter
+commit(table, rowDelta, branch);
+ }
+
+ @TestTemplate
+ public void testConcurrentDVsInSamePartitionWithFilter() {
+assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+// bucket16("u") -> 0
+DataFile dataFile = newDataFile("data_bucket=0");
+commit(table, table.newRowDelta().addRows(dataFile), branch);
+
+Snapshot base = latestSnapshot(table, branch);
+
+// prepare a DV for dataFile with a conflict detection filter scoped to
bucket 0
+DeleteFile dv1 = newDeletes(dataFile);
+RowDelta rowDelta =
+table
+.newRowDelta()
+.addDeletes(dv1)
+.validateFromSnapshot(base.snapshotId())
+.conflictDetectionFilter(Expressions.equal("data", "u")); //
bucket16("u") -> 0
+
+// concurrently commit another DV for the same data file in bucket 0
+DeleteFile dv2 = newDeletes(dataFile);
+commit(table, table.newRowDelta().addDeletes(dv2), branch);
+
+// must be conflict because the concurrent DV is in the same partition
+assertThatThrownBy(() -> commit(table, rowDelta, branch))
+.isInstanceOf(ValidationException.class)
+.hasMessageContaining("Found concurrently added DV for %s",
dataFile.location());
+ }
+
+ @TestTemplate
+ public void testDVValidationPartitionPruningManifestCount() {
+assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+// disable manifest merging so each commit produces a separate delete
manifest
+table.updateProperties().set(TableProperties.MANIFEST_MERGE_ENABLED,
"false").commit();
+
+// create data files and DVs across 10 different partitions (buckets 0-9)
+int numPartitions = 10;
+DataFile[] dataFiles = new DataFile[numPartitions];
+for (int bucket = 0; bucket < numPartitions; bucket++) {
+ dataFiles[bucket] = newDataFile("data_bucket=" + bucket);
+ c
Re: [PR] [core] Add manifest partition pruning to DV validation in MergingSnapshotProducer [iceberg]
amogh-jahagirdar commented on code in PR #15653:
URL: https://github.com/apache/iceberg/pull/15653#discussion_r3001640121
##
core/src/test/java/org/apache/iceberg/TestRowDelta.java:
##
@@ -2187,6 +2189,133 @@ public void testDuplicateDVsAreMergedAndEqDelete()
throws IOException {
assertThat(committedEqDelete.content()).isEqualTo(FileContent.EQUALITY_DELETES);
}
+ @TestTemplate
+ public void testConcurrentDVsInDifferentPartitionsWithFilter() {
+assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+// bucket16("u") -> 0, bucket16("a") -> 2
+DataFile dataFileInBucket0 = newDataFile("data_bucket=0");
+DataFile dataFileInBucket2 = newDataFile("data_bucket=2");
+commit(
+table,
table.newRowDelta().addRows(dataFileInBucket0).addRows(dataFileInBucket2),
branch);
+
+Snapshot base = latestSnapshot(table, branch);
+
+// prepare a DV for bucket 0 with a conflict detection filter scoped to
bucket 0
+DeleteFile dvBucket0 = newDeletes(dataFileInBucket0);
+RowDelta rowDelta =
+table
+.newRowDelta()
+.addDeletes(dvBucket0)
+.validateFromSnapshot(base.snapshotId())
+.conflictDetectionFilter(Expressions.equal("data", "u")); //
bucket16("u") -> 0
+
+// concurrently commit a DV in bucket 2
+DeleteFile dvBucket2 = newDeletes(dataFileInBucket2);
+commit(table, table.newRowDelta().addDeletes(dvBucket2), branch);
+
+// commit should succeed because the concurrent DV is in bucket 2
+// which does not overlap the conflict detection filter
+commit(table, rowDelta, branch);
Review Comment:
I think we need stronger assertions in this test to make sure the DV
metadata is what we expect., checkout the `validateManifest` helper that's used
in the other unit tests in this class
##
core/src/test/java/org/apache/iceberg/TestRowDelta.java:
##
@@ -2187,6 +2189,133 @@ public void testDuplicateDVsAreMergedAndEqDelete()
throws IOException {
assertThat(committedEqDelete.content()).isEqualTo(FileContent.EQUALITY_DELETES);
}
+ @TestTemplate
+ public void testConcurrentDVsInDifferentPartitionsWithFilter() {
+assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+// bucket16("u") -> 0, bucket16("a") -> 2
+DataFile dataFileInBucket0 = newDataFile("data_bucket=0");
+DataFile dataFileInBucket2 = newDataFile("data_bucket=2");
+commit(
+table,
table.newRowDelta().addRows(dataFileInBucket0).addRows(dataFileInBucket2),
branch);
+
+Snapshot base = latestSnapshot(table, branch);
+
+// prepare a DV for bucket 0 with a conflict detection filter scoped to
bucket 0
+DeleteFile dvBucket0 = newDeletes(dataFileInBucket0);
+RowDelta rowDelta =
+table
+.newRowDelta()
+.addDeletes(dvBucket0)
+.validateFromSnapshot(base.snapshotId())
+.conflictDetectionFilter(Expressions.equal("data", "u")); //
bucket16("u") -> 0
+
+// concurrently commit a DV in bucket 2
+DeleteFile dvBucket2 = newDeletes(dataFileInBucket2);
+commit(table, table.newRowDelta().addDeletes(dvBucket2), branch);
+
+// commit should succeed because the concurrent DV is in bucket 2
+// which does not overlap the conflict detection filter
+commit(table, rowDelta, branch);
+ }
+
+ @TestTemplate
+ public void testConcurrentDVsInSamePartitionWithFilter() {
+assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+// bucket16("u") -> 0
+DataFile dataFile = newDataFile("data_bucket=0");
+commit(table, table.newRowDelta().addRows(dataFile), branch);
+
+Snapshot base = latestSnapshot(table, branch);
+
+// prepare a DV for dataFile with a conflict detection filter scoped to
bucket 0
+DeleteFile dv1 = newDeletes(dataFile);
+RowDelta rowDelta =
+table
+.newRowDelta()
+.addDeletes(dv1)
+.validateFromSnapshot(base.snapshotId())
+.conflictDetectionFilter(Expressions.equal("data", "u")); //
bucket16("u") -> 0
+
+// concurrently commit another DV for the same data file in bucket 0
+DeleteFile dv2 = newDeletes(dataFile);
+commit(table, table.newRowDelta().addDeletes(dv2), branch);
+
+// must be conflict because the concurrent DV is in the same partition
+assertThatThrownBy(() -> commit(table, rowDelta, branch))
+.isInstanceOf(ValidationException.class)
+.hasMessageContaining("Found concurrently added DV for %s",
dataFile.location());
+ }
+
+ @TestTemplate
+ public void testDVValidationPartitionPruningManifestCount() {
+assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+// disable manifest merging so each commit produces a separate delete
manifest
+table.updateProperties().set(TableProperties.MANIFEST_MERGE_ENABLED,
"false").commit();
+
+// create data files and DVs across 10 different partitions (buckets 0-9)
+int numPartitions = 10;
+DataFi
Re: [PR] [core] Add manifest partition pruning to DV validation in MergingSnapshotProducer [iceberg]
danielcweeks commented on code in PR #15653: URL: https://github.com/apache/iceberg/pull/15653#discussion_r2997397362 ## core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java: ## @@ -836,7 +838,10 @@ protected void validateAddedDVs( List newDeleteManifests = history.first(); Set newSnapshotIds = history.second(); -Tasks.foreach(newDeleteManifests) +Iterable matchingManifests = Review Comment: I also validated this with spark and it does appear that even if the partition spec is updated, any DV produced will be placed in a manifest with the corresponding spec id for the associated file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [core] Add manifest partition pruning to DV validation in MergingSnapshotProducer [iceberg]
anoopj commented on code in PR #15653:
URL: https://github.com/apache/iceberg/pull/15653#discussion_r2991907491
##
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##
@@ -866,6 +871,38 @@ private void validateAddedDVs(
}
}
+ private Iterable filterManifestsByPartition(
+ TableMetadata base, Expression conflictDetectionFilter,
List manifests) {
+if (conflictDetectionFilter == null || conflictDetectionFilter ==
Expressions.alwaysTrue()) {
+ return manifests;
+}
+
+// if any concurrent manifest was written with a different partition spec,
skip pruning
+// to avoid incorrectly excluding manifests when a spec change happened
during validation
+int defaultSpecId = base.defaultSpecId();
+if (manifests.stream().anyMatch(m -> m.partitionSpecId() !=
defaultSpecId)) {
+ return manifests;
+}
+
+Map specsById = base.specsById();
+Map evaluators = Maps.newHashMap();
+return Iterables.filter(
Review Comment:
That is a good idea. Will add it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [core] Add manifest partition pruning to DV validation in MergingSnapshotProducer [iceberg]
amogh-jahagirdar commented on code in PR #15653:
URL: https://github.com/apache/iceberg/pull/15653#discussion_r2991663581
##
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##
@@ -866,6 +871,38 @@ private void validateAddedDVs(
}
}
+ private Iterable filterManifestsByPartition(
+ TableMetadata base, Expression conflictDetectionFilter,
List manifests) {
+if (conflictDetectionFilter == null || conflictDetectionFilter ==
Expressions.alwaysTrue()) {
+ return manifests;
+}
+
+// if any concurrent manifest was written with a different partition spec,
skip pruning
+// to avoid incorrectly excluding manifests when a spec change happened
during validation
+int defaultSpecId = base.defaultSpecId();
+if (manifests.stream().anyMatch(m -> m.partitionSpecId() !=
defaultSpecId)) {
+ return manifests;
+}
+
+Map specsById = base.specsById();
+Map evaluators = Maps.newHashMap();
+return Iterables.filter(
Review Comment:
Should this also filter where `manifest.hasAddedFiles()`? That way we don't
have to bother opening up manifests which were just a result of a metadata
compaction (only existing files in those). If so, may want to also rename the
method/pass in an additional predicate or something.
##
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##
@@ -836,7 +838,10 @@ protected void validateAddedDVs(
List newDeleteManifests = history.first();
Set newSnapshotIds = history.second();
-Tasks.foreach(newDeleteManifests)
+Iterable matchingManifests =
Review Comment:
As a side note, I think there are cases where we can just merge the
conflicting added DVs rather than fail; we can take that on a follow on, since
this PR is an improvement over the current behavior. Just mentioning it since
that may impact how we want to structure all this logic.
##
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##
@@ -836,7 +838,10 @@ protected void validateAddedDVs(
List newDeleteManifests = history.first();
Set newSnapshotIds = history.second();
-Tasks.foreach(newDeleteManifests)
+Iterable matchingManifests =
Review Comment:
It's a bit unfortunate ManifestGroup is tied to data files, it does expose a
lot of the primitives we need so we don't have to re-implement them.
##
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##
@@ -866,6 +871,38 @@ private void validateAddedDVs(
}
}
+ private Iterable filterManifestsByPartition(
+ TableMetadata base, Expression conflictDetectionFilter,
List manifests) {
+if (conflictDetectionFilter == null || conflictDetectionFilter ==
Expressions.alwaysTrue()) {
+ return manifests;
+}
+
+// if any concurrent manifest was written with a different partition spec,
skip pruning
+// to avoid incorrectly excluding manifests when a spec change happened
during validation
+int defaultSpecId = base.defaultSpecId();
+if (manifests.stream().anyMatch(m -> m.partitionSpecId() !=
defaultSpecId)) {
+ return manifests;
+}
+
+Map specsById = base.specsById();
+Map evaluators = Maps.newHashMap();
+return Iterables.filter(
Review Comment:
Or just keep it as is and add the filtering after the invocation of
filterManifestsByPartition, that's probably cleaner separation of concerns
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [core] Add manifest partition pruning to DV validation in MergingSnapshotProducer [iceberg]
anoopj commented on code in PR #15653: URL: https://github.com/apache/iceberg/pull/15653#discussion_r2991502926 ## core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java: ## @@ -836,7 +838,10 @@ protected void validateAddedDVs( List newDeleteManifests = history.first(); Set newSnapshotIds = history.second(); -Tasks.foreach(newDeleteManifests) +Iterable matchingManifests = Review Comment: I believe this scenario is forbidden by Iceberg spec and the reference implementation: ie a DV and the data file must have the same partition spec. So even if the default spec changes, the DV will take the data file's spec. > "The data file's partition (both spec and partition values) is equal [4] to the deletion vector's partition" I added a guard to be defensive though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [core] Add manifest partition pruning to DV validation in MergingSnapshotProducer [iceberg]
anoopj commented on code in PR #15653:
URL: https://github.com/apache/iceberg/pull/15653#discussion_r2991491918
##
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##
@@ -866,6 +871,25 @@ private void validateAddedDVs(
}
}
+ private Iterable filterManifestsByPartition(
+ TableMetadata base, Expression conflictDetectionFilter,
List manifests) {
+if (conflictDetectionFilter == null || conflictDetectionFilter ==
Expressions.alwaysTrue()) {
+ return manifests;
+}
+
+Map specsById = base.specsById();
+return Iterables.filter(
+manifests,
+manifest -> {
+ PartitionSpec spec = specsById.get(manifest.partitionSpecId());
+ Expression partitionFilter =
+ Projections.inclusive(spec,
caseSensitive).project(conflictDetectionFilter);
+ ManifestEvaluator evaluator =
+ ManifestEvaluator.forPartitionFilter(partitionFilter, spec,
caseSensitive);
Review Comment:
Good idea. Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [core] Add manifest partition pruning to DV validation in MergingSnapshotProducer [iceberg]
danielcweeks commented on code in PR #15653: URL: https://github.com/apache/iceberg/pull/15653#discussion_r2990382041 ## core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java: ## @@ -836,7 +838,10 @@ protected void validateAddedDVs( List newDeleteManifests = history.first(); Set newSnapshotIds = history.second(); -Tasks.foreach(newDeleteManifests) +Iterable matchingManifests = Review Comment: This is somewhat nuanced, but I believe we need an additional level of validation that no manifests between base and current have changed the partition spec. If any snapshot was written with a different spec, it could result in the partition filter improperly excluding a manifest and introducing a duplicate DV. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [core] Add manifest partition pruning to DV validation in MergingSnapshotProducer [iceberg]
danielcweeks commented on code in PR #15653:
URL: https://github.com/apache/iceberg/pull/15653#discussion_r2990403109
##
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##
@@ -866,6 +871,25 @@ private void validateAddedDVs(
}
}
+ private Iterable filterManifestsByPartition(
+ TableMetadata base, Expression conflictDetectionFilter,
List manifests) {
+if (conflictDetectionFilter == null || conflictDetectionFilter ==
Expressions.alwaysTrue()) {
+ return manifests;
+}
+
+Map specsById = base.specsById();
+return Iterables.filter(
+manifests,
+manifest -> {
+ PartitionSpec spec = specsById.get(manifest.partitionSpecId());
+ Expression partitionFilter =
+ Projections.inclusive(spec,
caseSensitive).project(conflictDetectionFilter);
+ ManifestEvaluator evaluator =
+ ManifestEvaluator.forPartitionFilter(partitionFilter, spec,
caseSensitive);
Review Comment:
You might be able to pull this out of the loop and reuse the evaluator.
This is especially true if there's only one spec (common), but pre-creating the
filters would save recreating for every iteration.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [core] Add manifest partition pruning to DV validation in MergingSnapshotProducer [iceberg]
danielcweeks commented on code in PR #15653: URL: https://github.com/apache/iceberg/pull/15653#discussion_r2990382041 ## core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java: ## @@ -836,7 +838,10 @@ protected void validateAddedDVs( List newDeleteManifests = history.first(); Set newSnapshotIds = history.second(); -Tasks.foreach(newDeleteManifests) +Iterable matchingManifests = Review Comment: This is somewhat nuanced, but I believe we need an additional level of validation that no snapshots between base and current have changed the partition spec. If any snapshot was written with a different spec, it could result in the partition filter improperly excluding a manifest and introducing a duplicate DV. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
