Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on PR #14810: URL: https://github.com/apache/iceberg/pull/14810#issuecomment-3770854306 Thank you @pvary for merging the change and for your support and valuable feedback! Thank you @mxm for reviewing the change and assisting with the release planning! I appreciate your help. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on PR #14810: URL: https://github.com/apache/iceberg/pull/14810#issuecomment-3728339727 Merged to main. Huhh.. this was a big one! Thanks @aiborodin for finding the issue and for persisting through all this review rounds! Thanks @mxm for the review! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary merged PR #14810: URL: https://github.com/apache/iceberg/pull/14810 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2674331992
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittableSerializer.java:
##
@@ -46,26 +47,61 @@ public byte[] serialize(DynamicCommittable committable)
throws IOException {
view.writeUTF(committable.jobId());
view.writeUTF(committable.operatorId());
view.writeLong(committable.checkpointId());
-view.writeInt(committable.manifest().length);
-view.write(committable.manifest());
+
+int numManifests = committable.manifests().length;
+view.writeInt(numManifests);
+for (int i = 0; i < numManifests; i++) {
+ byte[] manifest = committable.manifests()[i];
+ view.writeInt(manifest.length);
+ view.write(manifest);
+}
+
return out.toByteArray();
}
@Override
public DynamicCommittable deserialize(int version, byte[] serialized) throws
IOException {
-if (version == 1) {
- DataInputDeserializer view = new DataInputDeserializer(serialized);
- WriteTarget key = WriteTarget.deserializeFrom(view);
- String jobId = view.readUTF();
- String operatorId = view.readUTF();
- long checkpointId = view.readLong();
- int manifestLen = view.readInt();
- byte[] manifestBuf;
- manifestBuf = new byte[manifestLen];
- view.read(manifestBuf);
- return new DynamicCommittable(key, manifestBuf, jobId, operatorId,
checkpointId);
+if (version == VERSION_1) {
+ return deserializeV1(serialized);
+} else if (version == VERSION_2) {
+ return deserializeV2(serialized);
}
throw new IOException("Unrecognized version or corrupt state: " + version);
}
+
+ private DynamicCommittable deserializeV1(byte[] serialized) throws
IOException {
+DataInputDeserializer view = new DataInputDeserializer(serialized);
+WriteTarget key = WriteTarget.deserializeFrom(view);
+String jobId = view.readUTF();
+String operatorId = view.readUTF();
+long checkpointId = view.readLong();
+int manifestLen = view.readInt();
+byte[] manifestBuf;
+manifestBuf = new byte[manifestLen];
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2671922515
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittableSerializer.java:
##
@@ -46,26 +47,61 @@ public byte[] serialize(DynamicCommittable committable)
throws IOException {
view.writeUTF(committable.jobId());
view.writeUTF(committable.operatorId());
view.writeLong(committable.checkpointId());
-view.writeInt(committable.manifest().length);
-view.write(committable.manifest());
+
+int numManifests = committable.manifests().length;
+view.writeInt(numManifests);
+for (int i = 0; i < numManifests; i++) {
+ byte[] manifest = committable.manifests()[i];
+ view.writeInt(manifest.length);
+ view.write(manifest);
+}
+
return out.toByteArray();
}
@Override
public DynamicCommittable deserialize(int version, byte[] serialized) throws
IOException {
-if (version == 1) {
- DataInputDeserializer view = new DataInputDeserializer(serialized);
- WriteTarget key = WriteTarget.deserializeFrom(view);
- String jobId = view.readUTF();
- String operatorId = view.readUTF();
- long checkpointId = view.readLong();
- int manifestLen = view.readInt();
- byte[] manifestBuf;
- manifestBuf = new byte[manifestLen];
- view.read(manifestBuf);
- return new DynamicCommittable(key, manifestBuf, jobId, operatorId,
checkpointId);
+if (version == VERSION_1) {
+ return deserializeV1(serialized);
+} else if (version == VERSION_2) {
+ return deserializeV2(serialized);
}
throw new IOException("Unrecognized version or corrupt state: " + version);
}
+
+ private DynamicCommittable deserializeV1(byte[] serialized) throws
IOException {
+DataInputDeserializer view = new DataInputDeserializer(serialized);
+WriteTarget key = WriteTarget.deserializeFrom(view);
+String jobId = view.readUTF();
+String operatorId = view.readUTF();
+long checkpointId = view.readLong();
+int manifestLen = view.readInt();
+byte[] manifestBuf;
+manifestBuf = new byte[manifestLen];
Review Comment:
nit: should we put this into a single line?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2671922515
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittableSerializer.java:
##
@@ -46,26 +47,61 @@ public byte[] serialize(DynamicCommittable committable)
throws IOException {
view.writeUTF(committable.jobId());
view.writeUTF(committable.operatorId());
view.writeLong(committable.checkpointId());
-view.writeInt(committable.manifest().length);
-view.write(committable.manifest());
+
+int numManifests = committable.manifests().length;
+view.writeInt(numManifests);
+for (int i = 0; i < numManifests; i++) {
+ byte[] manifest = committable.manifests()[i];
+ view.writeInt(manifest.length);
+ view.write(manifest);
+}
+
return out.toByteArray();
}
@Override
public DynamicCommittable deserialize(int version, byte[] serialized) throws
IOException {
-if (version == 1) {
- DataInputDeserializer view = new DataInputDeserializer(serialized);
- WriteTarget key = WriteTarget.deserializeFrom(view);
- String jobId = view.readUTF();
- String operatorId = view.readUTF();
- long checkpointId = view.readLong();
- int manifestLen = view.readInt();
- byte[] manifestBuf;
- manifestBuf = new byte[manifestLen];
- view.read(manifestBuf);
- return new DynamicCommittable(key, manifestBuf, jobId, operatorId,
checkpointId);
+if (version == VERSION_1) {
+ return deserializeV1(serialized);
+} else if (version == VERSION_2) {
+ return deserializeV2(serialized);
}
throw new IOException("Unrecognized version or corrupt state: " + version);
}
+
+ private DynamicCommittable deserializeV1(byte[] serialized) throws
IOException {
+DataInputDeserializer view = new DataInputDeserializer(serialized);
+WriteTarget key = WriteTarget.deserializeFrom(view);
+String jobId = view.readUTF();
+String operatorId = view.readUTF();
+long checkpointId = view.readLong();
+int manifestLen = view.readInt();
+byte[] manifestBuf;
+manifestBuf = new byte[manifestLen];
Review Comment:
nit: should we put this into a single line? And maybe a new line before this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14810: URL: https://github.com/apache/iceberg/pull/14810#discussion_r2671078899 ## flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java: ## @@ -126,9 +125,14 @@ public void commit(Collection> commitRequests) return; } -// For every table and every checkpoint, we store the list of to-be-committed -// DynamicCommittable. -// There may be DynamicCommittable from previous checkpoints which have not been committed yet. +/* + Each (table, branch, checkpoint) triplet must have only one commit request. + There may be commit requests from previous checkpoints which have not been committed yet. + + We currently keep a List of commit requests per checkpoint instead of a single CommitRequest + to process the Flink state from previous releases, which had multiple commit requests due to a bug in the upstream + DynamicWriteResultAggregator. We should replace this with a single commit request in the next major release. Review Comment: I think it's fine to delay the List cleanup until the 1.12 release to keep things simple. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14810: URL: https://github.com/apache/iceberg/pull/14810#discussion_r2667505543 ## flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java: ## @@ -126,9 +125,14 @@ public void commit(Collection> commitRequests) return; } -// For every table and every checkpoint, we store the list of to-be-committed -// DynamicCommittable. -// There may be DynamicCommittable from previous checkpoints which have not been committed yet. +/* + Each (table, branch, checkpoint) triplet must have only one commit request. + There may be commit requests from previous checkpoints which have not been committed yet. + + We currently keep a List of commit requests per checkpoint instead of a single CommitRequest + to process the Flink state from previous releases, which had multiple commit requests due to a bug in the upstream + DynamicWriteResultAggregator. We should replace this with a single commit request in the next major release. Review Comment: > Could we change the uid for the committer somehow? If we could do it, then we could enforce using a news state for the new committer. From the Flink code, the committable state is in the `CommitterOperator`, controlled by the `SinkTransformationTranslator` internally in Flink. So it seems we can't change the UID of the committer operator. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14810: URL: https://github.com/apache/iceberg/pull/14810#discussion_r2665342879 ## flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java: ## @@ -126,9 +125,14 @@ public void commit(Collection> commitRequests) return; } -// For every table and every checkpoint, we store the list of to-be-committed -// DynamicCommittable. -// There may be DynamicCommittable from previous checkpoints which have not been committed yet. +/* + Each (table, branch, checkpoint) triplet must have only one commit request. + There may be commit requests from previous checkpoints which have not been committed yet. + + We currently keep a List of commit requests per checkpoint instead of a single CommitRequest + to process the Flink state from previous releases, which had multiple commit requests due to a bug in the upstream + DynamicWriteResultAggregator. We should replace this with a single commit request in the next major release. Review Comment: > > Could we change the uid for the committer somehow? If we could do it, then we could enforce using a news state for the new committer. > > This wouldn't really improve the state migration process because there will still be unmapped old state during migration. It would cause a failure if the job is restarted without setting `allowNonRestoredState`, which might prevent users accidentally forgetting to set this config. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
mxm commented on code in PR #14810: URL: https://github.com/apache/iceberg/pull/14810#discussion_r2664830343 ## flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java: ## @@ -126,9 +125,14 @@ public void commit(Collection> commitRequests) return; } -// For every table and every checkpoint, we store the list of to-be-committed -// DynamicCommittable. -// There may be DynamicCommittable from previous checkpoints which have not been committed yet. +/* + Each (table, branch, checkpoint) triplet must have only one commit request. + There may be commit requests from previous checkpoints which have not been committed yet. + + We currently keep a List of commit requests per checkpoint instead of a single CommitRequest + to process the Flink state from previous releases, which had multiple commit requests due to a bug in the upstream + DynamicWriteResultAggregator. We should replace this with a single commit request in the next major release. Review Comment: I think it is acceptable to advise users to upgrade their applications to 1.11.0 via a stop-with-savepoint. This note would have to go into an official migration guide. >Could we change the uid for the committer somehow? If we could do it, then we could enforce using a news state for the new committer. This wouldn't really improve the state migration process because there will still be unmapped old state during migration. >If we simplify the code and remove lists in this change, the committer state would still be compatible and have the same operator ID That would be nice if it doesn't introduce too much additional complexity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14810: URL: https://github.com/apache/iceberg/pull/14810#discussion_r2664410859 ## flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java: ## @@ -126,9 +125,14 @@ public void commit(Collection> commitRequests) return; } -// For every table and every checkpoint, we store the list of to-be-committed -// DynamicCommittable. -// There may be DynamicCommittable from previous checkpoints which have not been committed yet. +/* + Each (table, branch, checkpoint) triplet must have only one commit request. + There may be commit requests from previous checkpoints which have not been committed yet. + + We currently keep a List of commit requests per checkpoint instead of a single CommitRequest + to process the Flink state from previous releases, which had multiple commit requests due to a bug in the upstream + DynamicWriteResultAggregator. We should replace this with a single commit request in the next major release. Review Comment: > If we simplify the code and remove lists in this change, the committer state would still be compatible and have the same operator ID Could we change the `uid` for the committer somehow? If we could do it, then we could enforce using a news state for the new committer. > An alternative, potentially simpler solution would be to release this change in another patch release: 1.10.2. I still think that this change is not something we should release in a patch release. So if we go down this road, then we should keep this change in 1.11, and drop the code in 1.12.0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14810: URL: https://github.com/apache/iceberg/pull/14810#discussion_r2663979684 ## flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java: ## @@ -126,9 +125,14 @@ public void commit(Collection> commitRequests) return; } -// For every table and every checkpoint, we store the list of to-be-committed -// DynamicCommittable. -// There may be DynamicCommittable from previous checkpoints which have not been committed yet. +/* + Each (table, branch, checkpoint) triplet must have only one commit request. + There may be commit requests from previous checkpoints which have not been committed yet. + + We currently keep a List of commit requests per checkpoint instead of a single CommitRequest + to process the Flink state from previous releases, which had multiple commit requests due to a bug in the upstream + DynamicWriteResultAggregator. We should replace this with a single commit request in the next major release. Review Comment: We can do this. The only caveat is that I don't know how to programmatically enforce users to upgrade with `stop-with-savepoint`. If we simplify the code and remove lists in this change, the committer state would still be compatible and have the same operator ID. So if users upgrade without a prior `stop-with-savepoint`, the committer would skip some committables if there were more than one in the state per (table, branch, checkpoint). So we would need some way to prevent accidental upgrades without a prior `stop-with-savepoint`. An alternative, potentially simpler solution would be to release this change in another patch release: 1.10.2. It would allow us to have a version that automatically migrates the state. We could then remove lists as a separate change in 1.11.0 and add logic to detect the old state with multiple committabels per (table, branch, checkpoint), which would fail the job and ask users to upgrade to the previous version first (1.10.2) to migrate the state. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14810: URL: https://github.com/apache/iceberg/pull/14810#discussion_r2663979684 ## flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java: ## @@ -126,9 +125,14 @@ public void commit(Collection> commitRequests) return; } -// For every table and every checkpoint, we store the list of to-be-committed -// DynamicCommittable. -// There may be DynamicCommittable from previous checkpoints which have not been committed yet. +/* + Each (table, branch, checkpoint) triplet must have only one commit request. + There may be commit requests from previous checkpoints which have not been committed yet. + + We currently keep a List of commit requests per checkpoint instead of a single CommitRequest + to process the Flink state from previous releases, which had multiple commit requests due to a bug in the upstream + DynamicWriteResultAggregator. We should replace this with a single commit request in the next major release. Review Comment: We can do this. The only caveat is that I don't know how to programmatically enforce users to upgrade with `stop-with-savepoint`. If we simplify the code and remove lists in this change, the committer state would still be compatible and have the same operator ID. So if users upgrade without a prior `stop-with-savepoint`, the committer would skip some committables if there were more than one in the state per (table, branch, checkpoint). So we need some way to prevent accidental upgrades without a prior `stop-with-savepoint`. An alternative, potentially simpler solution would be to release this change in another patch release: 1.10.2. It would allow us to have a version that automatically migrates the state. We could then remove lists as a separate change in 1.11.0 and add logic to detect the old state with multiple committabels per (table, branch, checkpoint), which would fail the job and ask users to upgrade to the previous version first (1.10.2) to migrate the state. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14810: URL: https://github.com/apache/iceberg/pull/14810#discussion_r2661544342 ## flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java: ## @@ -126,9 +125,14 @@ public void commit(Collection> commitRequests) return; } -// For every table and every checkpoint, we store the list of to-be-committed -// DynamicCommittable. -// There may be DynamicCommittable from previous checkpoints which have not been committed yet. +/* + Each (table, branch, checkpoint) triplet must have only one commit request. + There may be commit requests from previous checkpoints which have not been committed yet. + + We currently keep a List of commit requests per checkpoint instead of a single CommitRequest + to process the Flink state from previous releases, which had multiple commit requests due to a bug in the upstream + DynamicWriteResultAggregator. We should replace this with a single commit request in the next major release. Review Comment: Please help me refresh where we are related to this comment. A few things: - 1.10.1 is released, but (because of my questions) this PR did not make it there - 1.11.0 is the next planned release - If I understand correctly, since the Dynamic Sink is experimental, we can be a bit more lax with the state restore expectations @aiborodin, @mxm, @Guosmilesmile: What do you think about simplifying the code and forcing the users to upgrade `stop-with-savepoint` and restore with `allowNonRestoredState`, and documenting it in the release notes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14810: URL: https://github.com/apache/iceberg/pull/14810#discussion_r2633639005 ## flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java: ## @@ -126,9 +125,14 @@ public void commit(Collection> commitRequests) return; } -// For every table and every checkpoint, we store the list of to-be-committed -// DynamicCommittable. -// There may be DynamicCommittable from previous checkpoints which have not been committed yet. +/* + Each (table, branch, checkpoint) triplet must have only one commit request. + There may be commit requests from previous checkpoints which have not been committed yet. + + We currently keep a List of commit requests per checkpoint instead of a single CommitRequest + to process the Flink state from previous releases, which had multiple commit requests due to a bug in the upstream + DynamicWriteResultAggregator. We should replace this with a single commit request in the next major release. Review Comment: Releasing this change in the next patch release (eg 1.10.1) would allow us to address the following points: 1. Fix Flink committable metrics: currently `flink.taskmanager.job.task.operator.successfulCommittables` and similar report incorrect values due to multiple committables coming into the committer, but only the aggregated one getting committed. 2. Avoid writing extra manifests: sink would only write one temporary manifest per table, branch, partition spec, while currently, it writes a new temporary manifest for each unique WriteTarget (table, branch, schemaId, specId, upsertMode, equalityFields). It would also allow us to simplify the code a lot, as we could remove lists in the next minor release (1.11). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2633636208
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableKey.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.dynamic;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Objects;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+class TableKey implements Serializable {
Review Comment:
Acknowledged
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittableSerializer.java:
##
@@ -46,26 +47,59 @@ public byte[] serialize(DynamicCommittable committable)
throws IOException {
view.writeUTF(committable.jobId());
view.writeUTF(committable.operatorId());
view.writeLong(committable.checkpointId());
-view.writeInt(committable.manifest().length);
-view.write(committable.manifest());
+
+view.writeInt(committable.manifests().length);
+for (int i = 0; i < committable.manifests().length; i++) {
+ view.writeInt(committable.manifests()[i].length);
+ view.write(committable.manifests()[i]);
+}
Review Comment:
Acknowledged
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
mxm commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2630631249
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittableSerializer.java:
##
@@ -46,26 +47,59 @@ public byte[] serialize(DynamicCommittable committable)
throws IOException {
view.writeUTF(committable.jobId());
view.writeUTF(committable.operatorId());
view.writeLong(committable.checkpointId());
-view.writeInt(committable.manifest().length);
-view.write(committable.manifest());
+
+view.writeInt(committable.manifests().length);
+for (int i = 0; i < committable.manifests().length; i++) {
+ view.writeInt(committable.manifests()[i].length);
+ view.write(committable.manifests()[i]);
+}
Review Comment:
I prefer this for readability:
```suggestion
int numManifests = committable.manifests().length;
view.writeInt(numManifests);
for (int i = 0; i < numManifests; i++) {
int manifest = committable.manifests()[i];
view.writeInt(manifest.length);
view.write(manifest);
}
```
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableKey.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.dynamic;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Objects;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+class TableKey implements Serializable {
Review Comment:
```suggestion
class TableKey {
```
This shouldn't be using Java serialization, as we have our own serialization
logic in this class.
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -126,9 +125,14 @@ public void
commit(Collection> commitRequests)
return;
}
-// For every table and every checkpoint, we store the list of
to-be-committed
-// DynamicCommittable.
-// There may be DynamicCommittable from previous checkpoints which have
not been committed yet.
+/*
+ Each (table, branch, checkpoint) triplet must have only one commit
request.
+ There may be commit requests from previous checkpoints which have not
been committed yet.
+
+ We currently keep a List of commit requests per checkpoint instead of a
single CommitRequest
+ to process the Flink state from previous releases, which had multiple
commit requests due to a bug in the upstream
+ DynamicWriteResultAggregator. We should replace this with a single
commit request in the next major release.
Review Comment:
Is it strictly necessary to merge this change to 1.10.1? The original bug
which motivated this change has already been fixed in 1.10.1. The changes here
are great, but I think they can wait until 1.11.0.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2625993848
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -210,36 +213,32 @@ private void commitPendingRequests(
NavigableMap> pendingResults = Maps.newTreeMap();
for (Map.Entry>> e :
commitRequestMap.entrySet()) {
for (CommitRequest committable : e.getValue()) {
-if (Arrays.equals(EMPTY_MANIFEST_DATA,
committable.getCommittable().manifest())) {
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2622378706
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -210,36 +213,32 @@ private void commitPendingRequests(
NavigableMap> pendingResults = Maps.newTreeMap();
for (Map.Entry>> e :
commitRequestMap.entrySet()) {
for (CommitRequest committable : e.getValue()) {
-if (Arrays.equals(EMPTY_MANIFEST_DATA,
committable.getCommittable().manifest())) {
Review Comment:
If we remove `EMPTY_MANIFEST_DATA` handling (as it is not really used), then
we should remove
```
static final String MAX_CONTINUOUS_EMPTY_COMMITS =
"flink.max-continuous-empty-commits";
[..]
private final Map maxContinuousEmptyCommitsMap;
private final Map continuousEmptyCheckpointsMap;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2622366781
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -57,11 +58,11 @@ class DynamicWriteResultAggregator
implements OneInputStreamOperator<
CommittableMessage,
CommittableMessage> {
private static final Logger LOG =
LoggerFactory.getLogger(DynamicWriteResultAggregator.class);
- private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
+ private static final byte[][] EMPTY_MANIFEST_DATA = new byte[0][];
Review Comment:
Discussed it with @mxm, and we probably can remove the whole
`EMPTY_MANIFEST_DATA` stuff from the `DynamicIcebergSink`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2622049013
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -126,9 +119,15 @@ public void
commit(Collection> commitRequests)
return;
}
-// For every table and every checkpoint, we store the list of
to-be-committed
-// DynamicCommittable.
-// There may be DynamicCommittable from previous checkpoints which have
not been committed yet.
+/*
+ Each (table, branch, checkpoint) triplet must have only one commit
request.
+ There may be commit requests from previous checkpoints which have not
been committed yet.
+
+ We currently keep a List of commit requests per checkpoint instead of a
single CommitRequest
+ to process the Flink state from previous releases, which had multiple
commit requests due to a bug in the
+ upstream DynamicWriteResultAggregator. Iceberg 1.11.0 will remove this,
and users should upgrade to the latest
Review Comment:
Acknowledged
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -57,11 +58,11 @@ class DynamicWriteResultAggregator
implements OneInputStreamOperator<
CommittableMessage,
CommittableMessage> {
private static final Logger LOG =
LoggerFactory.getLogger(DynamicWriteResultAggregator.class);
- private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
+ private static final byte[][] EMPTY_MANIFEST_DATA = new byte[0][];
Review Comment:
Good point. It appears that we don't need this because we use a Map in the
dynamic sink aggregator, which can't have an empty list of `WriteResults`. I
think it was ported from the `IcebergWriteAggregator`. I removed it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2619051403
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -57,11 +58,11 @@ class DynamicWriteResultAggregator
implements OneInputStreamOperator<
CommittableMessage,
CommittableMessage> {
private static final Logger LOG =
LoggerFactory.getLogger(DynamicWriteResultAggregator.class);
- private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
+ private static final byte[][] EMPTY_MANIFEST_DATA = new byte[0][];
Review Comment:
QQ: When can we have empty manifest data? Do we need it with the Dynamic
Sink?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14810: URL: https://github.com/apache/iceberg/pull/14810#discussion_r2618965830 ## flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java: ## @@ -126,9 +119,15 @@ public void commit(Collection> commitRequests) return; } -// For every table and every checkpoint, we store the list of to-be-committed -// DynamicCommittable. -// There may be DynamicCommittable from previous checkpoints which have not been committed yet. +/* + Each (table, branch, checkpoint) triplet must have only one commit request. + There may be commit requests from previous checkpoints which have not been committed yet. + + We currently keep a List of commit requests per checkpoint instead of a single CommitRequest + to process the Flink state from previous releases, which had multiple commit requests due to a bug in the + upstream DynamicWriteResultAggregator. Iceberg 1.11.0 will remove this, and users should upgrade to the latest Review Comment: this is just a technicality, but the bug is not there. Maybe: ``` to process the Flink state from previous releases, which had multiple commit requests in the upstream DynamicWriteResultAggregator. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14810: URL: https://github.com/apache/iceberg/pull/14810#discussion_r2618592470 ## flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java: ## @@ -126,9 +125,14 @@ public void commit(Collection> commitRequests) return; } -// For every table and every checkpoint, we store the list of to-be-committed -// DynamicCommittable. -// There may be DynamicCommittable from previous checkpoints which have not been committed yet. +/* + Each (table, branch, checkpoint) triplet must have only one commit request. + There may be commit requests from previous checkpoints which have not been committed yet. + + We currently keep a List of commit requests per checkpoint instead of a single CommitRequest + to process the Flink state from previous releases, which had multiple commit requests due to a bug in the upstream + DynamicWriteResultAggregator. We should replace this with a single commit request in the next major release. Review Comment: I agree, that we can discuss the removal separately. The comment contained the timeline, that is why I asked your thoughts here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14810: URL: https://github.com/apache/iceberg/pull/14810#discussion_r2618007875 ## flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java: ## @@ -126,9 +125,14 @@ public void commit(Collection> commitRequests) return; } -// For every table and every checkpoint, we store the list of to-be-committed -// DynamicCommittable. -// There may be DynamicCommittable from previous checkpoints which have not been committed yet. +/* + Each (table, branch, checkpoint) triplet must have only one commit request. + There may be commit requests from previous checkpoints which have not been committed yet. + + We currently keep a List of commit requests per checkpoint instead of a single CommitRequest + to process the Flink state from previous releases, which had multiple commit requests due to a bug in the upstream + DynamicWriteResultAggregator. We should replace this with a single commit request in the next major release. Review Comment: @pvary We can keep lists of committables for a longer period. Removing them from DynamicCommitter is a separate step, independent from this change. The current change is fully backward compatible. Although I'd prefer to remove lists of committables early, in the next minor release, 1.11, because it creates unnecessary complications and pollutes the code, making it harder to reason about. Additionally, we will still provide users with a smooth migration path. Users can upgrade to patch version 1.10.1 to resolve their current state and then upgrade to the latest minor version: 1.11+. We can always detect multiple committables in the `DynamicCommitter` state and ask users to upgrade to the previous patch version first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14810: URL: https://github.com/apache/iceberg/pull/14810#discussion_r2609629056 ## flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java: ## @@ -126,9 +125,14 @@ public void commit(Collection> commitRequests) return; } -// For every table and every checkpoint, we store the list of to-be-committed -// DynamicCommittable. -// There may be DynamicCommittable from previous checkpoints which have not been committed yet. +/* + Each (table, branch, checkpoint) triplet must have only one commit request. + There may be commit requests from previous checkpoints which have not been committed yet. + + We currently keep a List of commit requests per checkpoint instead of a single CommitRequest + to process the Flink state from previous releases, which had multiple commit requests due to a bug in the upstream + DynamicWriteResultAggregator. We should replace this with a single commit request in the next major release. Review Comment: I see the following timeline for the upgrade and the state migration: 1. We release this fix in the next Iceberg patch version - 1.10.1. It allows users to pick up the new change quickly and migrate their state to a single committable per table/branch/checkpoint. 2. We remove the support of lists in the next minor release: 1.11.0. This timeframe gives users enough time for a smooth migration. I updated the comment to reflect this. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14810: URL: https://github.com/apache/iceberg/pull/14810#discussion_r2613353949 ## flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java: ## @@ -126,9 +125,14 @@ public void commit(Collection> commitRequests) return; } -// For every table and every checkpoint, we store the list of to-be-committed -// DynamicCommittable. -// There may be DynamicCommittable from previous checkpoints which have not been committed yet. +/* + Each (table, branch, checkpoint) triplet must have only one commit request. + There may be commit requests from previous checkpoints which have not been committed yet. + + We currently keep a List of commit requests per checkpoint instead of a single CommitRequest + to process the Flink state from previous releases, which had multiple commit requests due to a bug in the upstream + DynamicWriteResultAggregator. We should replace this with a single commit request in the next major release. Review Comment: With the old Sink we had the following guarantees: - You were able to upgrade between Iceberg minor versions without any issue - Flink version upgrade needed an application stop-start. In this case the committable state was cleaned (notifyCheckpointComplete was run), and these incompatibilities were not an issue. With the new SinkV2 API, I'm not sure we have any guarantees for cleaning up the committables from the state, but if this is so, then we might need to keep these things for a longer period. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2612912900
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java:
##
@@ -75,18 +74,6 @@ Set equalityFields() {
return equalityFields;
}
- void serializeTo(DataOutputView view) throws IOException {
-view.writeUTF(tableName);
-view.writeUTF(branch);
-view.writeInt(schemaId);
-view.writeInt(specId);
-view.writeBoolean(upsertMode);
-view.writeInt(equalityFields.size());
-for (Integer equalityField : equalityFields) {
- view.writeInt(equalityField);
-}
- }
-
static WriteTarget deserializeFrom(DataInputView view) throws IOException {
Review Comment:
We can still keep this to support upgrading simple cases directly into the
new major version 1.11+ (where there's a single V1 committable per table,
branch, checkpoint in the Flink state). It would work even after we remove
lists of committables in DynamicCommitter.
Other use cases would upgrade to the new minor version first (1.10.1) to fix
their state. We can also throw an exception in the DynamicCommitter if we
detect this and ask users to upgrade to 1.10.1 first.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2612912900
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java:
##
@@ -75,18 +74,6 @@ Set equalityFields() {
return equalityFields;
}
- void serializeTo(DataOutputView view) throws IOException {
-view.writeUTF(tableName);
-view.writeUTF(branch);
-view.writeInt(schemaId);
-view.writeInt(specId);
-view.writeBoolean(upsertMode);
-view.writeInt(equalityFields.size());
-for (Integer equalityField : equalityFields) {
- view.writeInt(equalityField);
-}
- }
-
static WriteTarget deserializeFrom(DataInputView view) throws IOException {
Review Comment:
We can still keep this to support upgrading simple use cases where there's a
single V1 committable per table, branch, checkpoint directly into the latest
version (1.11+). It would work even after we remove lists of committables in
DynamicCommitter. Other use cases would upgrade to the new minor version first
(1.10.1) to fix their state.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2612912900
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java:
##
@@ -75,18 +74,6 @@ Set equalityFields() {
return equalityFields;
}
- void serializeTo(DataOutputView view) throws IOException {
-view.writeUTF(tableName);
-view.writeUTF(branch);
-view.writeInt(schemaId);
-view.writeInt(specId);
-view.writeBoolean(upsertMode);
-view.writeInt(equalityFields.size());
-for (Integer equalityField : equalityFields) {
- view.writeInt(equalityField);
-}
- }
-
static WriteTarget deserializeFrom(DataInputView view) throws IOException {
Review Comment:
We can still keep this to support upgrading simple use cases where there's a
single V1 committable per table, branch, checkpoint directly into the new major
version (1.11+). It would work even after we remove lists of committables in
DynamicCommitter. Other use cases would upgrade to the new minor version first
(1.10.1) to fix their state.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2611439580
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java:
##
@@ -75,18 +74,6 @@ Set equalityFields() {
return equalityFields;
}
- void serializeTo(DataOutputView view) throws IOException {
-view.writeUTF(tableName);
-view.writeUTF(branch);
-view.writeInt(schemaId);
-view.writeInt(specId);
-view.writeBoolean(upsertMode);
-view.writeInt(equalityFields.size());
-for (Integer equalityField : equalityFields) {
- view.writeInt(equalityField);
-}
- }
-
static WriteTarget deserializeFrom(DataInputView view) throws IOException {
Review Comment:
So we can deprecate this too, and it will be removed as soon as we don't
have any only DynamicCommittable?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2609631494
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java:
##
@@ -75,18 +74,6 @@ Set equalityFields() {
return equalityFields;
}
- void serializeTo(DataOutputView view) throws IOException {
-view.writeUTF(tableName);
-view.writeUTF(branch);
-view.writeInt(schemaId);
-view.writeInt(specId);
-view.writeBoolean(upsertMode);
-view.writeInt(equalityFields.size());
-for (Integer equalityField : equalityFields) {
- view.writeInt(equalityField);
-}
- }
-
Review Comment:
We don't use this in production because we don't serialise the WriteTarget.
We still need the deserialise method, though, to read V1 committables.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2609630549
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultSerializer.java:
##
@@ -41,6 +41,7 @@ public byte[] serialize(DynamicWriteResult writeResult)
throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out);
writeResult.key().serializeTo(view);
+view.writeInt(writeResult.specId());
Review Comment:
WriteResults are serialized only at runtime for communication between
writers and the aggregator, so this will work.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2609629056
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -126,9 +125,14 @@ public void
commit(Collection> commitRequests)
return;
}
-// For every table and every checkpoint, we store the list of
to-be-committed
-// DynamicCommittable.
-// There may be DynamicCommittable from previous checkpoints which have
not been committed yet.
+/*
+ Each (table, branch, checkpoint) triplet must have only one commit
request.
+ There may be commit requests from previous checkpoints which have not
been committed yet.
+
+ We currently keep a List of commit requests per checkpoint instead of a
single CommitRequest
+ to process the Flink state from previous releases, which had multiple
commit requests due to a bug in the upstream
+ DynamicWriteResultAggregator. We should replace this with a single
commit request in the next major release.
Review Comment:
I see the following timeline for the upgrade and the state migration:
1. We release this fix in the next minor Iceberg version - 1.10.1. It allows
users to pick up the new change quickly and migrate their state to a single
committable per table/branch/checkpoint.
2. We remove the support of lists in the next major release: 1.11.0.
This timeframe gives users enough time for a smooth migration. I updated the
comment to reflect this.
What do you think?
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -210,36 +218,40 @@ private void commitPendingRequests(
NavigableMap> pendingResults = Maps.newTreeMap();
for (Map.Entry>> e :
commitRequestMap.entrySet()) {
for (CommitRequest committable : e.getValue()) {
-if (Arrays.equals(EMPTY_MANIFEST_DATA,
committable.getCommittable().manifest())) {
+if (Arrays.deepEquals(EMPTY_MANIFEST_DATA,
committable.getCommittable().manifests())) {
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2606098030
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java:
##
@@ -75,18 +74,6 @@ Set equalityFields() {
return equalityFields;
}
- void serializeTo(DataOutputView view) throws IOException {
-view.writeUTF(tableName);
-view.writeUTF(branch);
-view.writeInt(schemaId);
-view.writeInt(specId);
-view.writeBoolean(upsertMode);
-view.writeInt(equalityFields.size());
-for (Integer equalityField : equalityFields) {
- view.writeInt(equalityField);
-}
- }
-
Review Comment:
Why do we remove this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2606087020
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultSerializer.java:
##
@@ -41,6 +41,7 @@ public byte[] serialize(DynamicWriteResult writeResult)
throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out);
writeResult.key().serializeTo(view);
+view.writeInt(writeResult.specId());
Review Comment:
Do we need to have a new version? WriteResults serialized with the old
version will not be readable with the new serializer
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2606068307
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -210,36 +218,40 @@ private void commitPendingRequests(
NavigableMap> pendingResults = Maps.newTreeMap();
for (Map.Entry>> e :
commitRequestMap.entrySet()) {
for (CommitRequest committable : e.getValue()) {
-if (Arrays.equals(EMPTY_MANIFEST_DATA,
committable.getCommittable().manifest())) {
+if (Arrays.deepEquals(EMPTY_MANIFEST_DATA,
committable.getCommittable().manifests())) {
Review Comment:
Could we just check for `committable.getCommittable().manifests().isEmpty()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2606065163
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -210,36 +218,40 @@ private void commitPendingRequests(
NavigableMap> pendingResults = Maps.newTreeMap();
for (Map.Entry>> e :
commitRequestMap.entrySet()) {
for (CommitRequest committable : e.getValue()) {
-if (Arrays.equals(EMPTY_MANIFEST_DATA,
committable.getCommittable().manifest())) {
+if (Arrays.deepEquals(EMPTY_MANIFEST_DATA,
committable.getCommittable().manifests())) {
Review Comment:
Maybe, it is ok.
NVM
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2606020361
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -210,36 +218,40 @@ private void commitPendingRequests(
NavigableMap> pendingResults = Maps.newTreeMap();
for (Map.Entry>> e :
commitRequestMap.entrySet()) {
for (CommitRequest committable : e.getValue()) {
-if (Arrays.equals(EMPTY_MANIFEST_DATA,
committable.getCommittable().manifest())) {
+if (Arrays.deepEquals(EMPTY_MANIFEST_DATA,
committable.getCommittable().manifests())) {
Review Comment:
This logic is not good in this way. We need to combine it to the `manifests`
loop below.
The end goal what we want to archive:
- Every manifest should be in the `pendingResults` which is not
`EMPTY_MANIFEST_DATA`
- If the `pendingResults` is empty, then put a single `EMPTY_MANIFEST_DATA`
into it
We shouldn't put `EMPTY_MANIFEST_DATA` to the `pendingResults`. And but put
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14810: URL: https://github.com/apache/iceberg/pull/14810#discussion_r2605953980 ## flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java: ## @@ -126,9 +125,14 @@ public void commit(Collection> commitRequests) return; } -// For every table and every checkpoint, we store the list of to-be-committed -// DynamicCommittable. -// There may be DynamicCommittable from previous checkpoints which have not been committed yet. +/* + Each (table, branch, checkpoint) triplet must have only one commit request. + There may be commit requests from previous checkpoints which have not been committed yet. + + We currently keep a List of commit requests per checkpoint instead of a single CommitRequest + to process the Flink state from previous releases, which had multiple commit requests due to a bug in the upstream + DynamicWriteResultAggregator. We should replace this with a single commit request in the next major release. Review Comment: Let's talk about @mxm and @Guosmilesmile about the timeline here. It would be good to have a specific version mentioned here about the removal of this extra list. When could we say that the state will not contain any old `DynamicCommittable` objects, and how could the users ensure that everything is ready for an upgrade. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on PR #14312: URL: https://github.com/apache/iceberg/pull/14312#issuecomment-3636166241 @pvary I re-opened this PR in https://github.com/apache/iceberg/pull/14810. Could you please take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2605873412
##
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommittableSerializer.java:
##
@@ -24,38 +24,28 @@
import java.io.IOException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.junit.jupiter.api.Test;
class TestDynamicCommittableSerializer {
+ private static final DynamicCommittable COMMITTABLE =
+ new DynamicCommittable(
+ new TableKey("table", "branch"),
+ new byte[][] {{3, 4}},
+ JobID.generate().toHexString(),
+ new OperatorID().toHexString(),
+ 5);
@Test
void testRoundtrip() throws IOException {
Review Comment:
I added tests for V1 ser/se.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2605872093
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -200,27 +198,27 @@ private void commitPendingRequests(
throws IOException {
long checkpointId = commitRequestMap.lastKey();
List manifests = Lists.newArrayList();
-NavigableMap> pendingResults = Maps.newTreeMap();
+NavigableMap pendingResults = Maps.newTreeMap();
for (Map.Entry>> e :
commitRequestMap.entrySet()) {
+ WriteResult.Builder builder = WriteResult.builder();
for (CommitRequest committable : e.getValue()) {
-if (Arrays.equals(EMPTY_MANIFEST_DATA,
committable.getCommittable().manifest())) {
- pendingResults
- .computeIfAbsent(e.getKey(), unused -> Lists.newArrayList())
- .add(EMPTY_WRITE_RESULT);
-} else {
- DeltaManifests deltaManifests =
- SimpleVersionedSerialization.readVersionAndDeSerialize(
- DeltaManifestsSerializer.INSTANCE,
committable.getCommittable().manifest());
- pendingResults
- .computeIfAbsent(e.getKey(), unused -> Lists.newArrayList())
- .add(FlinkManifestUtil.readCompletedFiles(deltaManifests,
table.io(), table.specs()));
- manifests.addAll(deltaManifests.manifests());
+if (!Arrays.deepEquals(EMPTY_MANIFEST_DATA,
committable.getCommittable().manifests())) {
+ for (byte[] manifest : committable.getCommittable().manifests()) {
+DeltaManifests deltaManifests =
+SimpleVersionedSerialization.readVersionAndDeSerialize(
+DeltaManifestsSerializer.INSTANCE, manifest);
+builder.add(
+FlinkManifestUtil.readCompletedFiles(deltaManifests,
table.io(), table.specs()));
+manifests.addAll(deltaManifests.manifests());
+ }
}
+
+pendingResults.put(e.getKey(), builder.build());
}
}
-CommitSummary summary = new CommitSummary();
-summary.addAll(pendingResults);
+// TODO: Fix aggregated commit summary logged multiple times per each each
checkpoint commit
Review Comment:
This is a separate issue, unrelated to this change.
The committer logs the same `CommitSummary` across multiple commits for
different checkpoints.
##
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java:
##
@@ -423,6 +426,26 @@ void testPartitionSpecEvolution() throws Exception {
new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main",
spec2));
runTest(rows);
+
+// Validate the table has expected partition specs
+Table table =
CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of(DATABASE, "t1"));
+
+Map tableSpecs = table.specs();
+List expectedSpecs = List.of(spec1, spec2,
PartitionSpec.unpartitioned());
+
+assertThat(tableSpecs).hasSize(expectedSpecs.size());
+expectedSpecs.forEach(
+expectedSpec ->
+assertThat(
+tableSpecs.values().stream()
+// TODO: Fix PartitionSpecEvolution#evolve to re-use
PartitionField names of
Review Comment:
I expect it to be a relatively simple change. However, it seems worth a
separate PR because it doesn't directly relate to the scope of this change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2605871302
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -119,6 +110,11 @@ public void
commit(Collection> commitRequests)
return;
}
+/*
+ TODO: Replace List> with a single
CommitRequest
Review Comment:
I updated the comment.
> We could still have multiple DynamicCommittable for a checkpoint - when we
have incoming data for multiple branches. So we still need to have a list. Am I
right?
Each branch would have a separate incoming commit request because commit
requests are grouped by `TableKey` (table and branch). There must be only one
commit request per TableKey (table and branch).
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -155,6 +152,7 @@ public void
commit(Collection> commitRequests)
}
}
+ /** TODO: Reuse {@link
org.apache.iceberg.flink.sink.SinkUtil#getMaxCommittedCheckpointId} * */
Review Comment:
I will remove this duplication when porting
https://github.com/apache/iceberg/pull/14517 to the regular `IcebergSink`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14312: URL: https://github.com/apache/iceberg/pull/14312#discussion_r2605870647 ## flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java: ## @@ -141,6 +137,7 @@ public void commit(Collection> commitRequests) getMaxCommittedCheckpointId( table, last.jobId(), last.operatorId(), entry.getKey().branch()); // Mark the already committed FilesCommittable(s) as finished + // TODO: Add more logging for visibility on skipped commit requests Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
github-actions[bot] commented on PR #14312: URL: https://github.com/apache/iceberg/pull/14312#issuecomment-3594007953 This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
github-actions[bot] closed pull request #14312: Flink: Refactor WriteResult aggregation in DynamicIcebergSink URL: https://github.com/apache/iceberg/pull/14312 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
github-actions[bot] commented on PR #14312: URL: https://github.com/apache/iceberg/pull/14312#issuecomment-3567193874 This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2455077190
##
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommittableSerializer.java:
##
@@ -24,38 +24,28 @@
import java.io.IOException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.junit.jupiter.api.Test;
class TestDynamicCommittableSerializer {
+ private static final DynamicCommittable COMMITTABLE =
+ new DynamicCommittable(
+ new TableKey("table", "branch"),
+ new byte[][] {{3, 4}},
+ JobID.generate().toHexString(),
+ new OperatorID().toHexString(),
+ 5);
@Test
void testRoundtrip() throws IOException {
Review Comment:
How hard would it be to test that the old commitables are deserialized
correctly?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2455069904
##
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java:
##
@@ -423,6 +426,26 @@ void testPartitionSpecEvolution() throws Exception {
new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main",
spec2));
runTest(rows);
+
+// Validate the table has expected partition specs
+Table table =
CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of(DATABASE, "t1"));
+
+Map tableSpecs = table.specs();
+List expectedSpecs = List.of(spec1, spec2,
PartitionSpec.unpartitioned());
+
+assertThat(tableSpecs).hasSize(expectedSpecs.size());
+expectedSpecs.forEach(
+expectedSpec ->
+assertThat(
+tableSpecs.values().stream()
+// TODO: Fix PartitionSpecEvolution#evolve to re-use
PartitionField names of
Review Comment:
How complicated this change would be in the `evolve`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2455061714
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId)
throws IOException {
}
/**
- * Write all the completed data files to a newly created manifest file and
return the manifest's
+ * Write all the completed data files to a newly created manifest files and
return the manifests'
* avro serialized bytes.
*/
@VisibleForTesting
- byte[] writeToManifest(
- WriteTarget key, Collection writeResults, long
checkpointId)
+ byte[][] writeToManifests(
+ String tableName, Collection writeResults, long
checkpointId)
Review Comment:
Thanks!
Let me take a look
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2455056623
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -200,27 +198,27 @@ private void commitPendingRequests(
throws IOException {
long checkpointId = commitRequestMap.lastKey();
List manifests = Lists.newArrayList();
-NavigableMap> pendingResults = Maps.newTreeMap();
+NavigableMap pendingResults = Maps.newTreeMap();
for (Map.Entry>> e :
commitRequestMap.entrySet()) {
+ WriteResult.Builder builder = WriteResult.builder();
for (CommitRequest committable : e.getValue()) {
-if (Arrays.equals(EMPTY_MANIFEST_DATA,
committable.getCommittable().manifest())) {
- pendingResults
- .computeIfAbsent(e.getKey(), unused -> Lists.newArrayList())
- .add(EMPTY_WRITE_RESULT);
-} else {
- DeltaManifests deltaManifests =
- SimpleVersionedSerialization.readVersionAndDeSerialize(
- DeltaManifestsSerializer.INSTANCE,
committable.getCommittable().manifest());
- pendingResults
- .computeIfAbsent(e.getKey(), unused -> Lists.newArrayList())
- .add(FlinkManifestUtil.readCompletedFiles(deltaManifests,
table.io(), table.specs()));
- manifests.addAll(deltaManifests.manifests());
+if (!Arrays.deepEquals(EMPTY_MANIFEST_DATA,
committable.getCommittable().manifests())) {
+ for (byte[] manifest : committable.getCommittable().manifests()) {
+DeltaManifests deltaManifests =
+SimpleVersionedSerialization.readVersionAndDeSerialize(
+DeltaManifestsSerializer.INSTANCE, manifest);
+builder.add(
+FlinkManifestUtil.readCompletedFiles(deltaManifests,
table.io(), table.specs()));
+manifests.addAll(deltaManifests.manifests());
+ }
}
+
+pendingResults.put(e.getKey(), builder.build());
}
}
-CommitSummary summary = new CommitSummary();
-summary.addAll(pendingResults);
+// TODO: Fix aggregated commit summary logged multiple times per each each
checkpoint commit
Review Comment:
I don't get this comment. Could you please elaborate?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2455035563
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -155,6 +152,7 @@ public void
commit(Collection> commitRequests)
}
}
+ /** TODO: Reuse {@link
org.apache.iceberg.flink.sink.SinkUtil#getMaxCommittedCheckpointId} * */
Review Comment:
Why is this for the future?
This code is a duplication which we should remove here, or maybe even in a
different commit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14312: URL: https://github.com/apache/iceberg/pull/14312#discussion_r2455028357 ## flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java: ## @@ -119,6 +110,11 @@ public void commit(Collection> commitRequests) return; } +/* + TODO: Replace List> with a single CommitRequest Review Comment: We could still have multiple DynamicCommittable for a checkpoint - when we have incoming data for multiple branches. So we still need to have a list. Am I right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14312: URL: https://github.com/apache/iceberg/pull/14312#discussion_r2455011962 ## flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java: ## @@ -119,6 +110,11 @@ public void commit(Collection> commitRequests) return; } +/* + TODO: Replace List> with a single CommitRequest Review Comment: Is this because we still need to be prepared for the old state, where we have a List of committables? Could we add this to the comment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2455016399
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -155,6 +152,7 @@ public void
commit(Collection> commitRequests)
}
}
+ /** TODO: Reuse {@link
org.apache.iceberg.flink.sink.SinkUtil#getMaxCommittedCheckpointId} * */
Review Comment:
This is for the future when the state only contains committables serialized
by the new Aggregator?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14312: URL: https://github.com/apache/iceberg/pull/14312#discussion_r2455008466 ## flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java: ## @@ -141,6 +137,7 @@ public void commit(Collection> commitRequests) getMaxCommittedCheckpointId( table, last.jobId(), last.operatorId(), entry.getKey().branch()); // Mark the already committed FilesCommittable(s) as finished + // TODO: Add more logging for visibility on skipped commit requests Review Comment: Could we fix this TODO? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2454238360
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId)
throws IOException {
}
/**
- * Write all the completed data files to a newly created manifest file and
return the manifest's
+ * Write all the completed data files to a newly created manifest files and
return the manifests'
* avro serialized bytes.
*/
@VisibleForTesting
- byte[] writeToManifest(
- WriteTarget key, Collection writeResults, long
checkpointId)
+ byte[][] writeToManifests(
+ String tableName, Collection writeResults, long
checkpointId)
Review Comment:
@pvary I rebased this PR on top of the latest changes, so it should be good
to review.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2450011710
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId)
throws IOException {
}
/**
- * Write all the completed data files to a newly created manifest file and
return the manifest's
+ * Write all the completed data files to a newly created manifest files and
return the manifests'
* avro serialized bytes.
*/
@VisibleForTesting
- byte[] writeToManifest(
- WriteTarget key, Collection writeResults, long
checkpointId)
+ byte[][] writeToManifests(
+ String tableName, Collection writeResults, long
checkpointId)
Review Comment:
> Is there anything which would prevent you to use the DynamicSink in 1.10.1?
Not really, just highlighting that we can easily add this to 1.10.1 as well.
I am also okay with this being included in 1.11 if that's the established
practice.
Looking forward to merging this change =)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2450011710
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId)
throws IOException {
}
/**
- * Write all the completed data files to a newly created manifest file and
return the manifest's
+ * Write all the completed data files to a newly created manifest files and
return the manifests'
* avro serialized bytes.
*/
@VisibleForTesting
- byte[] writeToManifest(
- WriteTarget key, Collection writeResults, long
checkpointId)
+ byte[][] writeToManifests(
+ String tableName, Collection writeResults, long
checkpointId)
Review Comment:
> Is there anything which would prevent you to use the DynamicSink in 1.10.1?
Not really, just highlighting that we can easily add this to 1.10.1 as well.
I am also okay with this being included in 1.11 if that's the established
practice.
Looking forward to merging this change =)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2450011710
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId)
throws IOException {
}
/**
- * Write all the completed data files to a newly created manifest file and
return the manifest's
+ * Write all the completed data files to a newly created manifest files and
return the manifests'
* avro serialized bytes.
*/
@VisibleForTesting
- byte[] writeToManifest(
- WriteTarget key, Collection writeResults, long
checkpointId)
+ byte[][] writeToManifests(
+ String tableName, Collection writeResults, long
checkpointId)
Review Comment:
> Is there anything which would prevent you to use the DynamicSink in 1.10.1?
Not really, just highlighting that we can easily add this to 1.10.1 as well.
I am also okay with this being included in 1.11 if that's the established
practice.
Looking forward to merging this =)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2447236047
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId)
throws IOException {
}
/**
- * Write all the completed data files to a newly created manifest file and
return the manifest's
+ * Write all the completed data files to a newly created manifest files and
return the manifests'
* avro serialized bytes.
*/
@VisibleForTesting
- byte[] writeToManifest(
- WriteTarget key, Collection writeResults, long
checkpointId)
+ byte[][] writeToManifests(
+ String tableName, Collection writeResults, long
checkpointId)
Review Comment:
The patch releases are usually for bugfixes.
#14182 is already scheduled for 1.10.1, so that correctness part will be
fixed.
If I understand correctly this PR is optimization at this point, but I might
be wrong. Is there anything which would prevent you to use the DynamicSink in
1.10.1?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2447130079
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId)
throws IOException {
}
/**
- * Write all the completed data files to a newly created manifest file and
return the manifest's
+ * Write all the completed data files to a newly created manifest files and
return the manifests'
* avro serialized bytes.
*/
@VisibleForTesting
- byte[] writeToManifest(
- WriteTarget key, Collection writeResults, long
checkpointId)
+ byte[][] writeToManifests(
+ String tableName, Collection writeResults, long
checkpointId)
Review Comment:
> This change will be released in a few months. So other users might have to
wait to get your feature.
@pvary @Guosmilesmile We can now include this change in Iceberg 1.10.1
because it supports previous serialised states and is backward compatible. So
users will be able to upgrade seamlessly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
Guosmilesmile commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2444826438
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId)
throws IOException {
}
/**
- * Write all the completed data files to a newly created manifest file and
return the manifest's
+ * Write all the completed data files to a newly created manifest files and
return the manifests'
* avro serialized bytes.
*/
@VisibleForTesting
- byte[] writeToManifest(
- WriteTarget key, Collection writeResults, long
checkpointId)
+ byte[][] writeToManifests(
+ String tableName, Collection writeResults, long
checkpointId)
Review Comment:
Personally, a seamless upgrade feels like a big win to me (as long as it
doesn’t require a huge engineering effort).
When users have to perform extra steps during an upgrade, it’s confusing—and
even scary—because they can’t tell whether the extra work is caused by this
feature or by something else entirely. If we can make the upgrade invisible, we
spare them that confusion and fear, which is awesome.
So IMHO it’s worth the effort.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2444757010
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId)
throws IOException {
}
/**
- * Write all the completed data files to a newly created manifest file and
return the manifest's
+ * Write all the completed data files to a newly created manifest files and
return the manifests'
* avro serialized bytes.
*/
@VisibleForTesting
- byte[] writeToManifest(
- WriteTarget key, Collection writeResults, long
checkpointId)
+ byte[][] writeToManifests(
+ String tableName, Collection writeResults, long
checkpointId)
Review Comment:
This change will be released in a few months. So other users might have to
wait to get your feature. I think this could mean we should try to serve their
needs and allow path for easy upgrade. Adding a new serializer is not a big
issue here, so IMHO it would worth the effort.
But that's just me, let's see what others think.
CC: @mxm, @Guosmilesmile
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2444125269
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId)
throws IOException {
}
/**
- * Write all the completed data files to a newly created manifest file and
return the manifest's
+ * Write all the completed data files to a newly created manifest files and
return the manifests'
* avro serialized bytes.
*/
@VisibleForTesting
- byte[] writeToManifest(
- WriteTarget key, Collection writeResults, long
checkpointId)
+ byte[][] writeToManifests(
+ String tableName, Collection writeResults, long
checkpointId)
Review Comment:
> I'm leaning towards option 1, because I'm a bit skeptical about other
serialization methods, and I think we will need longer time to agree on a way
to move forward.
Sure, I added a new version and a method to deserialize the previous
committable version to achieve this.
> One argument against it is that the multiple manifest serialization
doesn't add too much performance gain for us.
We get performance gain from this change by not having to write a new
manifest for each unique `schemaId`, `upsertMode`, `equalityFields` in the
`WriteTarget`, but only for unique `specIds`.
> In the past, we have always made sure, that users can upgrade their job to
a newer Iceberg version without dropping the state. This is important for long
running jobs, where in-place upgrade is critical.
In my opinion, this approach makes sense for battle-tested and
well-established APIs. The `DynamicIcebergSink` API was only released a few
weeks ago and would not have as many users relying on it. I think quick
iteration and resolution of issues are more beneficial at the early stages of
this code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2444125269
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId)
throws IOException {
}
/**
- * Write all the completed data files to a newly created manifest file and
return the manifest's
+ * Write all the completed data files to a newly created manifest files and
return the manifests'
* avro serialized bytes.
*/
@VisibleForTesting
- byte[] writeToManifest(
- WriteTarget key, Collection writeResults, long
checkpointId)
+ byte[][] writeToManifests(
+ String tableName, Collection writeResults, long
checkpointId)
Review Comment:
> I'm leaning towards option 1, because I'm a bit skeptical about other
serialization methods, and I think we will need longer time to agree on a way
to move forward.
Sure, I added a new version and a method to deserialize the previous
committable version.
> One argument against it is that the multiple manifest serialization
doesn't add too much performance gain for us.
We get performance gain from this change by not having to write a new
manifest for each unique `schemaId`, `upsertMode`, `equalityFields` in the
`WriteTarget`, but only for unique `specIds`.
> In the past, we have always made sure, that users can upgrade their job to
a newer Iceberg version without dropping the state. This is important for long
running jobs, where in-place upgrade is critical.
In my opinion, this approach makes sense for battle-tested and
well-established APIs. The `DynamicIcebergSink` API was only released a few
weeks ago and would not have as many users relying on it. I think quick
iteration and resolution of issues are more beneficial at the early stages of
this code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2428698701
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId)
throws IOException {
}
/**
- * Write all the completed data files to a newly created manifest file and
return the manifest's
+ * Write all the completed data files to a newly created manifest files and
return the manifests'
* avro serialized bytes.
*/
@VisibleForTesting
- byte[] writeToManifest(
- WriteTarget key, Collection writeResults, long
checkpointId)
+ byte[][] writeToManifests(
+ String tableName, Collection writeResults, long
checkpointId)
Review Comment:
Will take a look soon.
QQ in the meantime: How do we handle Iceberg version change for long running
jobs?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2427677246
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId)
throws IOException {
}
/**
- * Write all the completed data files to a newly created manifest file and
return the manifest's
+ * Write all the completed data files to a newly created manifest files and
return the manifests'
* avro serialized bytes.
*/
@VisibleForTesting
- byte[] writeToManifest(
- WriteTarget key, Collection writeResults, long
checkpointId)
+ byte[][] writeToManifests(
+ String tableName, Collection writeResults, long
checkpointId)
Review Comment:
Yes, a manifest can only contain files for a single partition spec.
Yes, the `RowDelta` writes multiple manifests behind the scenes. It keeps
track of `DataFile` and `DeleteFile` by their partition specs. See the
implementation of
[MergingSnapshotProducer](https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java#L87):
```java
// update data
private final Map newDataFilesBySpec =
Maps.newHashMap();
private Long newDataFilesDataSequenceNumber;
private final Map newDeleteFilesBySpec =
Maps.newHashMap();
```
The current implementation implicitly writes a new temporary manifest for
each unique `WriteTarget`, which creates multiple `DynamicCommittables` per
(table, branch, checkpoint) triplet (incorrect behaviour). See the
implementation of
[DynamicWriteResultAggregator](https://github.com/apache/iceberg/blob/main/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java#L101).
With this refactor, we write even fewer manifests (only for unique partition
specs), which makes the implementation explicit:
1. Create multiple manifests only for different partition specs (similar to
`RowDelta`)
2. Create only one `DynamicCommittable` per checkpoint, and use multiple
manifests for serialisation
3. Remove all assumptions of multiple commit requests in `DynamicCommitter`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2435490550
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId)
throws IOException {
}
/**
- * Write all the completed data files to a newly created manifest file and
return the manifest's
+ * Write all the completed data files to a newly created manifest files and
return the manifests'
* avro serialized bytes.
*/
@VisibleForTesting
- byte[] writeToManifest(
- WriteTarget key, Collection writeResults, long
checkpointId)
+ byte[][] writeToManifests(
+ String tableName, Collection writeResults, long
checkpointId)
Review Comment:
I need some time to think through the implications, and how we handled these
situations in the past.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2426103810
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId)
throws IOException {
}
/**
- * Write all the completed data files to a newly created manifest file and
return the manifest's
+ * Write all the completed data files to a newly created manifest files and
return the manifests'
* avro serialized bytes.
*/
@VisibleForTesting
- byte[] writeToManifest(
- WriteTarget key, Collection writeResults, long
checkpointId)
+ byte[][] writeToManifests(
+ String tableName, Collection writeResults, long
checkpointId)
Review Comment:
We need to create a separate manifest file for each partition spec according
to the [Iceberg spec](https://iceberg.apache.org/spec/#manifests):
> A manifest stores files for a single partition spec. When a table’s
partition spec changes, old files remain in the older manifest and newer files
are written to a new manifest. This is required because a manifest file’s
schema is based on its partition spec (see below).
I attempted to hack this and write a single `ManifestFile` with multiple
`DataFiles`/`DeleteFiles` using different partition specs. This approach
resulted in incorrect partition specs returned when reading the manifest back
using the `ManifestReader`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2434774536
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId)
throws IOException {
}
/**
- * Write all the completed data files to a newly created manifest file and
return the manifest's
+ * Write all the completed data files to a newly created manifest files and
return the manifests'
* avro serialized bytes.
*/
@VisibleForTesting
- byte[] writeToManifest(
- WriteTarget key, Collection writeResults, long
checkpointId)
+ byte[][] writeToManifests(
+ String tableName, Collection writeResults, long
checkpointId)
Review Comment:
What do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2431136207
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId)
throws IOException {
}
/**
- * Write all the completed data files to a newly created manifest file and
return the manifest's
+ * Write all the completed data files to a newly created manifest files and
return the manifests'
* avro serialized bytes.
*/
@VisibleForTesting
- byte[] writeToManifest(
- WriteTarget key, Collection writeResults, long
checkpointId)
+ byte[][] writeToManifests(
+ String tableName, Collection writeResults, long
checkpointId)
Review Comment:
> Both the current code and the refactor use ManifestReader/Writer with a
hard-coded version 2 of the Iceberg format for committable state serialisation
in a checkpoint. See
[FlinkManifestUtil](https://github.com/apache/iceberg/blob/main/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java#L44).
But we change the DynamicCommittable to contain TableKey instead of
WriteTarget and a byte[][] instead of byte[].
Should we add a new version to the DynamicCommittableSerializer?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2430867538
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId)
throws IOException {
}
/**
- * Write all the completed data files to a newly created manifest file and
return the manifest's
+ * Write all the completed data files to a newly created manifest files and
return the manifests'
* avro serialized bytes.
*/
@VisibleForTesting
- byte[] writeToManifest(
- WriteTarget key, Collection writeResults, long
checkpointId)
+ byte[][] writeToManifests(
+ String tableName, Collection writeResults, long
checkpointId)
Review Comment:
> Will take a look soon.
Thanks, I'd appreciate it.
> QQ in the meantime: How do we handle Iceberg version change for long
running jobs?
Both the current code and the refactor use `ManifestReader`/`Writer` with a
hard-coded version 2 of the Iceberg format for committable state serialisation
in a checkpoint. See
[FlinkManifestUtil](https://github.com/apache/iceberg/blob/main/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java#L44).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2438463916
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId)
throws IOException {
}
/**
- * Write all the completed data files to a newly created manifest file and
return the manifest's
+ * Write all the completed data files to a newly created manifest files and
return the manifests'
* avro serialized bytes.
*/
@VisibleForTesting
- byte[] writeToManifest(
- WriteTarget key, Collection writeResults, long
checkpointId)
+ byte[][] writeToManifests(
+ String tableName, Collection writeResults, long
checkpointId)
Review Comment:
In the past, we have always made sure, that users can upgrade their job to a
newer Iceberg version without dropping the state. This is important for long
running jobs, where in-place upgrade is critical.
I think here we should follow the same pattern. If we change how we store
data in the state, then we need to make sure, that the old state could be read.
This is done by versioning the serializer. The groundwork is there, and we need
to use it.
I understand it is extra work to do if we want to change the serialization
again, but I'm still not convinced that we have a good solution to that problem.
I see 2 options:
1. Implement a serialization for the multiple manifests now, and remove it
if we change it again before the next release
2. Block this PR until we agree upon the next serialization solution.
I'm leaning towards option 1, because I'm a bit skeptical about other
serialization methods, and I think we will need longer time to agree on a way
to move forward.
One argument against it is that the multiple manifest serialization doesn't
add too much performance gain for us. It "just" helps by simplifying the
committer code.
Your thoughts?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
pvary commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2426156572
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId)
throws IOException {
}
/**
- * Write all the completed data files to a newly created manifest file and
return the manifest's
+ * Write all the completed data files to a newly created manifest files and
return the manifests'
* avro serialized bytes.
*/
@VisibleForTesting
- byte[] writeToManifest(
- WriteTarget key, Collection writeResults, long
checkpointId)
+ byte[][] writeToManifests(
+ String tableName, Collection writeResults, long
checkpointId)
Review Comment:
Let me confirm:
- A manifest can only contain files for a single partition spec
- The `RowDelta` operation could commit files for multiple partition spec?
How does the `RowDelta` do this? Does it create multiple manifest files
behind the scenes?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2431623073
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId)
throws IOException {
}
/**
- * Write all the completed data files to a newly created manifest file and
return the manifest's
+ * Write all the completed data files to a newly created manifest files and
return the manifests'
* avro serialized bytes.
*/
@VisibleForTesting
- byte[] writeToManifest(
- WriteTarget key, Collection writeResults, long
checkpointId)
+ byte[][] writeToManifests(
+ String tableName, Collection writeResults, long
checkpointId)
Review Comment:
Why do we need to maintain checkpoint state backwards compatibility, given
this would go into Iceberg 1.11.0?
I can see that the `DynamicIcebergSink` is annotated with `@Experimental`,
which doesn't promise any compatibility guarantees.
I am also planning to change the committable state serialisation a couple
more times in the following commits to replace manifest serialisation
completely. If we want to keep backwards compatibility, should we introduce a
new version in the last commit?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]
aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2427677246
##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId)
throws IOException {
}
/**
- * Write all the completed data files to a newly created manifest file and
return the manifest's
+ * Write all the completed data files to a newly created manifest files and
return the manifests'
* avro serialized bytes.
*/
@VisibleForTesting
- byte[] writeToManifest(
- WriteTarget key, Collection writeResults, long
checkpointId)
+ byte[][] writeToManifests(
+ String tableName, Collection writeResults, long
checkpointId)
Review Comment:
Yes, a manifest can only contain files for a single partition spec.
Yes, the `RowDelta` writes multiple manifests behind the scenes. It keeps
track of `DataFile` and `DeleteFile` by their partition specs. See the
implementation of
[MergingSnapshotProducer](https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java#L87):
```java
// update data
private final Map newDataFilesBySpec =
Maps.newHashMap();
private Long newDataFilesDataSequenceNumber;
private final Map newDeleteFilesBySpec =
Maps.newHashMap();
```
The current implementation of
[DynamicWriteResultAggregator](https://github.com/apache/iceberg/blob/main/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java#L101)
implicitly writes a new temporary manifest for each unique `WriteTarget`,
which creates multiple `DynamicCommittables` per (table, branch, checkpoint)
triplet (incorrect behaviour).
With this refactor, we write even fewer manifests (only for unique partition
specs), which makes the implementation explicit:
1. Create multiple manifests only for different partition specs (similar to
`RowDelta`)
2. Create only one `DynamicCommittable` per checkpoint, and use multiple
manifests for serialisation
3. Remove all assumptions of multiple commit requests in `DynamicCommitter`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
