Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2026-01-19 Thread via GitHub


aiborodin commented on PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#issuecomment-3770854306

   Thank you @pvary for merging the change and for your support and valuable 
feedback!
   Thank you @mxm for reviewing the change and assisting with the release 
planning!
   I appreciate your help.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2026-01-09 Thread via GitHub


pvary commented on PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#issuecomment-3728339727

   Merged to main.
   Huhh.. this was a big one!
   Thanks @aiborodin for finding the issue and for persisting through all this 
review rounds!
   Thanks @mxm for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2026-01-09 Thread via GitHub


pvary merged PR #14810:
URL: https://github.com/apache/iceberg/pull/14810


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2026-01-08 Thread via GitHub


aiborodin commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2674331992


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittableSerializer.java:
##
@@ -46,26 +47,61 @@ public byte[] serialize(DynamicCommittable committable) 
throws IOException {
 view.writeUTF(committable.jobId());
 view.writeUTF(committable.operatorId());
 view.writeLong(committable.checkpointId());
-view.writeInt(committable.manifest().length);
-view.write(committable.manifest());
+
+int numManifests = committable.manifests().length;
+view.writeInt(numManifests);
+for (int i = 0; i < numManifests; i++) {
+  byte[] manifest = committable.manifests()[i];
+  view.writeInt(manifest.length);
+  view.write(manifest);
+}
+
 return out.toByteArray();
   }
 
   @Override
   public DynamicCommittable deserialize(int version, byte[] serialized) throws 
IOException {
-if (version == 1) {
-  DataInputDeserializer view = new DataInputDeserializer(serialized);
-  WriteTarget key = WriteTarget.deserializeFrom(view);
-  String jobId = view.readUTF();
-  String operatorId = view.readUTF();
-  long checkpointId = view.readLong();
-  int manifestLen = view.readInt();
-  byte[] manifestBuf;
-  manifestBuf = new byte[manifestLen];
-  view.read(manifestBuf);
-  return new DynamicCommittable(key, manifestBuf, jobId, operatorId, 
checkpointId);
+if (version == VERSION_1) {
+  return deserializeV1(serialized);
+} else if (version == VERSION_2) {
+  return deserializeV2(serialized);
 }
 
 throw new IOException("Unrecognized version or corrupt state: " + version);
   }
+
+  private DynamicCommittable deserializeV1(byte[] serialized) throws 
IOException {
+DataInputDeserializer view = new DataInputDeserializer(serialized);
+WriteTarget key = WriteTarget.deserializeFrom(view);
+String jobId = view.readUTF();
+String operatorId = view.readUTF();
+long checkpointId = view.readLong();
+int manifestLen = view.readInt();
+byte[] manifestBuf;
+manifestBuf = new byte[manifestLen];

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2026-01-08 Thread via GitHub


pvary commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2671922515


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittableSerializer.java:
##
@@ -46,26 +47,61 @@ public byte[] serialize(DynamicCommittable committable) 
throws IOException {
 view.writeUTF(committable.jobId());
 view.writeUTF(committable.operatorId());
 view.writeLong(committable.checkpointId());
-view.writeInt(committable.manifest().length);
-view.write(committable.manifest());
+
+int numManifests = committable.manifests().length;
+view.writeInt(numManifests);
+for (int i = 0; i < numManifests; i++) {
+  byte[] manifest = committable.manifests()[i];
+  view.writeInt(manifest.length);
+  view.write(manifest);
+}
+
 return out.toByteArray();
   }
 
   @Override
   public DynamicCommittable deserialize(int version, byte[] serialized) throws 
IOException {
-if (version == 1) {
-  DataInputDeserializer view = new DataInputDeserializer(serialized);
-  WriteTarget key = WriteTarget.deserializeFrom(view);
-  String jobId = view.readUTF();
-  String operatorId = view.readUTF();
-  long checkpointId = view.readLong();
-  int manifestLen = view.readInt();
-  byte[] manifestBuf;
-  manifestBuf = new byte[manifestLen];
-  view.read(manifestBuf);
-  return new DynamicCommittable(key, manifestBuf, jobId, operatorId, 
checkpointId);
+if (version == VERSION_1) {
+  return deserializeV1(serialized);
+} else if (version == VERSION_2) {
+  return deserializeV2(serialized);
 }
 
 throw new IOException("Unrecognized version or corrupt state: " + version);
   }
+
+  private DynamicCommittable deserializeV1(byte[] serialized) throws 
IOException {
+DataInputDeserializer view = new DataInputDeserializer(serialized);
+WriteTarget key = WriteTarget.deserializeFrom(view);
+String jobId = view.readUTF();
+String operatorId = view.readUTF();
+long checkpointId = view.readLong();
+int manifestLen = view.readInt();
+byte[] manifestBuf;
+manifestBuf = new byte[manifestLen];

Review Comment:
   nit: should we put this into a single line?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2026-01-08 Thread via GitHub


pvary commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2671922515


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittableSerializer.java:
##
@@ -46,26 +47,61 @@ public byte[] serialize(DynamicCommittable committable) 
throws IOException {
 view.writeUTF(committable.jobId());
 view.writeUTF(committable.operatorId());
 view.writeLong(committable.checkpointId());
-view.writeInt(committable.manifest().length);
-view.write(committable.manifest());
+
+int numManifests = committable.manifests().length;
+view.writeInt(numManifests);
+for (int i = 0; i < numManifests; i++) {
+  byte[] manifest = committable.manifests()[i];
+  view.writeInt(manifest.length);
+  view.write(manifest);
+}
+
 return out.toByteArray();
   }
 
   @Override
   public DynamicCommittable deserialize(int version, byte[] serialized) throws 
IOException {
-if (version == 1) {
-  DataInputDeserializer view = new DataInputDeserializer(serialized);
-  WriteTarget key = WriteTarget.deserializeFrom(view);
-  String jobId = view.readUTF();
-  String operatorId = view.readUTF();
-  long checkpointId = view.readLong();
-  int manifestLen = view.readInt();
-  byte[] manifestBuf;
-  manifestBuf = new byte[manifestLen];
-  view.read(manifestBuf);
-  return new DynamicCommittable(key, manifestBuf, jobId, operatorId, 
checkpointId);
+if (version == VERSION_1) {
+  return deserializeV1(serialized);
+} else if (version == VERSION_2) {
+  return deserializeV2(serialized);
 }
 
 throw new IOException("Unrecognized version or corrupt state: " + version);
   }
+
+  private DynamicCommittable deserializeV1(byte[] serialized) throws 
IOException {
+DataInputDeserializer view = new DataInputDeserializer(serialized);
+WriteTarget key = WriteTarget.deserializeFrom(view);
+String jobId = view.readUTF();
+String operatorId = view.readUTF();
+long checkpointId = view.readLong();
+int manifestLen = view.readInt();
+byte[] manifestBuf;
+manifestBuf = new byte[manifestLen];

Review Comment:
   nit: should we put this into a single line? And maybe a new line before this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2026-01-07 Thread via GitHub


aiborodin commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2671078899


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -126,9 +125,14 @@ public void 
commit(Collection> commitRequests)
   return;
 }
 
-// For every table and every checkpoint, we store the list of 
to-be-committed
-// DynamicCommittable.
-// There may be DynamicCommittable from previous checkpoints which have 
not been committed yet.
+/*
+  Each (table, branch, checkpoint) triplet must have only one commit 
request.
+  There may be commit requests from previous checkpoints which have not 
been committed yet.
+
+  We currently keep a List of commit requests per checkpoint instead of a 
single CommitRequest
+  to process the Flink state from previous releases, which had multiple 
commit requests due to a bug in the upstream
+  DynamicWriteResultAggregator. We should replace this with a single 
commit request in the next major release.

Review Comment:
   I think it's fine to delay the List cleanup until the 1.12 release to keep 
things simple.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2026-01-07 Thread via GitHub


aiborodin commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2667505543


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -126,9 +125,14 @@ public void 
commit(Collection> commitRequests)
   return;
 }
 
-// For every table and every checkpoint, we store the list of 
to-be-committed
-// DynamicCommittable.
-// There may be DynamicCommittable from previous checkpoints which have 
not been committed yet.
+/*
+  Each (table, branch, checkpoint) triplet must have only one commit 
request.
+  There may be commit requests from previous checkpoints which have not 
been committed yet.
+
+  We currently keep a List of commit requests per checkpoint instead of a 
single CommitRequest
+  to process the Flink state from previous releases, which had multiple 
commit requests due to a bug in the upstream
+  DynamicWriteResultAggregator. We should replace this with a single 
commit request in the next major release.

Review Comment:
   > Could we change the uid for the committer somehow? If we could do it, then 
we could enforce using a news state for the new committer.
   
   From the Flink code, the committable state is in the `CommitterOperator`, 
controlled by the `SinkTransformationTranslator` internally in Flink. So it 
seems we can't change the UID of the committer operator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2026-01-06 Thread via GitHub


pvary commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2665342879


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -126,9 +125,14 @@ public void 
commit(Collection> commitRequests)
   return;
 }
 
-// For every table and every checkpoint, we store the list of 
to-be-committed
-// DynamicCommittable.
-// There may be DynamicCommittable from previous checkpoints which have 
not been committed yet.
+/*
+  Each (table, branch, checkpoint) triplet must have only one commit 
request.
+  There may be commit requests from previous checkpoints which have not 
been committed yet.
+
+  We currently keep a List of commit requests per checkpoint instead of a 
single CommitRequest
+  to process the Flink state from previous releases, which had multiple 
commit requests due to a bug in the upstream
+  DynamicWriteResultAggregator. We should replace this with a single 
commit request in the next major release.

Review Comment:
   > > Could we change the uid for the committer somehow? If we could do it, 
then we could enforce using a news state for the new committer.
   >
   > This wouldn't really improve the state migration process because there 
will still be unmapped old state during migration.
   
   It would cause a failure if the job is restarted without setting 
`allowNonRestoredState`, which might prevent users accidentally forgetting to 
set this config.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2026-01-06 Thread via GitHub


mxm commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2664830343


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -126,9 +125,14 @@ public void 
commit(Collection> commitRequests)
   return;
 }
 
-// For every table and every checkpoint, we store the list of 
to-be-committed
-// DynamicCommittable.
-// There may be DynamicCommittable from previous checkpoints which have 
not been committed yet.
+/*
+  Each (table, branch, checkpoint) triplet must have only one commit 
request.
+  There may be commit requests from previous checkpoints which have not 
been committed yet.
+
+  We currently keep a List of commit requests per checkpoint instead of a 
single CommitRequest
+  to process the Flink state from previous releases, which had multiple 
commit requests due to a bug in the upstream
+  DynamicWriteResultAggregator. We should replace this with a single 
commit request in the next major release.

Review Comment:
   I think it is acceptable to advise users to upgrade their applications to 
1.11.0 via a stop-with-savepoint. This note would have to go into an official 
migration guide.
   
   >Could we change the uid for the committer somehow? If we could do it, then 
we could enforce using a news state for the new committer.
   
   This wouldn't really improve the state migration process because there will 
still be unmapped old state during migration.
   
   >If we simplify the code and remove lists in this change, the committer 
state would still be compatible and have the same operator ID
   
   That would be nice if it doesn't introduce too much additional complexity.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2026-01-06 Thread via GitHub


pvary commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2664410859


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -126,9 +125,14 @@ public void 
commit(Collection> commitRequests)
   return;
 }
 
-// For every table and every checkpoint, we store the list of 
to-be-committed
-// DynamicCommittable.
-// There may be DynamicCommittable from previous checkpoints which have 
not been committed yet.
+/*
+  Each (table, branch, checkpoint) triplet must have only one commit 
request.
+  There may be commit requests from previous checkpoints which have not 
been committed yet.
+
+  We currently keep a List of commit requests per checkpoint instead of a 
single CommitRequest
+  to process the Flink state from previous releases, which had multiple 
commit requests due to a bug in the upstream
+  DynamicWriteResultAggregator. We should replace this with a single 
commit request in the next major release.

Review Comment:
   > If we simplify the code and remove lists in this change, the committer 
state would still be compatible and have the same operator ID
   
   Could we change the `uid` for the committer somehow? If we could do it, then 
we could enforce using a news state for the new committer.
   
   > An alternative, potentially simpler solution would be to release this 
change in another patch release: 1.10.2.
   
   I still think that this change is not something we should release in a patch 
release. So if we go down this road, then we should keep this change in 1.11, 
and drop the code in 1.12.0
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2026-01-05 Thread via GitHub


aiborodin commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2663979684


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -126,9 +125,14 @@ public void 
commit(Collection> commitRequests)
   return;
 }
 
-// For every table and every checkpoint, we store the list of 
to-be-committed
-// DynamicCommittable.
-// There may be DynamicCommittable from previous checkpoints which have 
not been committed yet.
+/*
+  Each (table, branch, checkpoint) triplet must have only one commit 
request.
+  There may be commit requests from previous checkpoints which have not 
been committed yet.
+
+  We currently keep a List of commit requests per checkpoint instead of a 
single CommitRequest
+  to process the Flink state from previous releases, which had multiple 
commit requests due to a bug in the upstream
+  DynamicWriteResultAggregator. We should replace this with a single 
commit request in the next major release.

Review Comment:
   We can do this. The only caveat is that I don't know how to programmatically 
enforce users to upgrade with `stop-with-savepoint`. If we simplify the code 
and remove lists in this change, the committer state would still be compatible 
and have the same operator ID. So if users upgrade without a prior 
`stop-with-savepoint`, the committer would skip some committables if there were 
more than one in the state per (table, branch, checkpoint).  So we would need 
some way to prevent accidental upgrades without a prior `stop-with-savepoint`.
   
   An alternative, potentially simpler solution would be to release this change 
in another patch release: 1.10.2. It would allow us to have a version that 
automatically migrates the state. We could then remove lists as a separate 
change in 1.11.0 and add logic to detect the old state with multiple 
committabels per (table, branch, checkpoint), which would fail the job and ask 
users to upgrade to the previous version first (1.10.2) to migrate the state.
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2026-01-05 Thread via GitHub


aiborodin commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2663979684


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -126,9 +125,14 @@ public void 
commit(Collection> commitRequests)
   return;
 }
 
-// For every table and every checkpoint, we store the list of 
to-be-committed
-// DynamicCommittable.
-// There may be DynamicCommittable from previous checkpoints which have 
not been committed yet.
+/*
+  Each (table, branch, checkpoint) triplet must have only one commit 
request.
+  There may be commit requests from previous checkpoints which have not 
been committed yet.
+
+  We currently keep a List of commit requests per checkpoint instead of a 
single CommitRequest
+  to process the Flink state from previous releases, which had multiple 
commit requests due to a bug in the upstream
+  DynamicWriteResultAggregator. We should replace this with a single 
commit request in the next major release.

Review Comment:
   We can do this. The only caveat is that I don't know how to programmatically 
enforce users to upgrade with `stop-with-savepoint`. If we simplify the code 
and remove lists in this change, the committer state would still be compatible 
and have the same operator ID. So if users upgrade without a prior 
`stop-with-savepoint`, the committer would skip some committables if there were 
more than one in the state per (table, branch, checkpoint).  So we need some 
way to prevent accidental upgrades without a prior `stop-with-savepoint`.
   
   An alternative, potentially simpler solution would be to release this change 
in another patch release: 1.10.2. It would allow us to have a version that 
automatically migrates the state. We could then remove lists as a separate 
change in 1.11.0 and add logic to detect the old state with multiple 
committabels per (table, branch, checkpoint), which would fail the job and ask 
users to upgrade to the previous version first (1.10.2) to migrate the state.
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2026-01-05 Thread via GitHub


pvary commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2661544342


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -126,9 +125,14 @@ public void 
commit(Collection> commitRequests)
   return;
 }
 
-// For every table and every checkpoint, we store the list of 
to-be-committed
-// DynamicCommittable.
-// There may be DynamicCommittable from previous checkpoints which have 
not been committed yet.
+/*
+  Each (table, branch, checkpoint) triplet must have only one commit 
request.
+  There may be commit requests from previous checkpoints which have not 
been committed yet.
+
+  We currently keep a List of commit requests per checkpoint instead of a 
single CommitRequest
+  to process the Flink state from previous releases, which had multiple 
commit requests due to a bug in the upstream
+  DynamicWriteResultAggregator. We should replace this with a single 
commit request in the next major release.

Review Comment:
   Please help me refresh where we are related to this comment.
   
   A few things:
   - 1.10.1 is released, but (because of my questions) this PR did not make it 
there
   - 1.11.0 is the next planned release
   - If I understand correctly, since the Dynamic Sink is experimental, we can 
be a bit more lax with the state restore expectations
   
   @aiborodin, @mxm, @Guosmilesmile: What do you think about simplifying the 
code and forcing the users to upgrade `stop-with-savepoint` and restore with 
`allowNonRestoredState`, and documenting it in the release notes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-12-18 Thread via GitHub


aiborodin commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2633639005


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -126,9 +125,14 @@ public void 
commit(Collection> commitRequests)
   return;
 }
 
-// For every table and every checkpoint, we store the list of 
to-be-committed
-// DynamicCommittable.
-// There may be DynamicCommittable from previous checkpoints which have 
not been committed yet.
+/*
+  Each (table, branch, checkpoint) triplet must have only one commit 
request.
+  There may be commit requests from previous checkpoints which have not 
been committed yet.
+
+  We currently keep a List of commit requests per checkpoint instead of a 
single CommitRequest
+  to process the Flink state from previous releases, which had multiple 
commit requests due to a bug in the upstream
+  DynamicWriteResultAggregator. We should replace this with a single 
commit request in the next major release.

Review Comment:
   Releasing this change in the next patch release (eg 1.10.1) would allow us 
to address the following points:
   1. Fix Flink committable metrics: currently 
`flink.taskmanager.job.task.operator.successfulCommittables` and similar report 
incorrect values due to multiple committables coming into the committer, but 
only the aggregated one getting committed.
   2. Avoid writing extra manifests: sink would only write one temporary 
manifest per table, branch, partition spec, while currently, it writes a new 
temporary manifest for each unique WriteTarget (table, branch, schemaId, 
specId, upsertMode, equalityFields).
   
   It would also allow us to simplify the code a lot, as we could remove lists 
in the next minor release (1.11).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-12-18 Thread via GitHub


aiborodin commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2633636208


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableKey.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.dynamic;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Objects;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+class TableKey implements Serializable {

Review Comment:
   Acknowledged



##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittableSerializer.java:
##
@@ -46,26 +47,59 @@ public byte[] serialize(DynamicCommittable committable) 
throws IOException {
 view.writeUTF(committable.jobId());
 view.writeUTF(committable.operatorId());
 view.writeLong(committable.checkpointId());
-view.writeInt(committable.manifest().length);
-view.write(committable.manifest());
+
+view.writeInt(committable.manifests().length);
+for (int i = 0; i < committable.manifests().length; i++) {
+  view.writeInt(committable.manifests()[i].length);
+  view.write(committable.manifests()[i]);
+}

Review Comment:
   Acknowledged



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-12-18 Thread via GitHub


mxm commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2630631249


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittableSerializer.java:
##
@@ -46,26 +47,59 @@ public byte[] serialize(DynamicCommittable committable) 
throws IOException {
 view.writeUTF(committable.jobId());
 view.writeUTF(committable.operatorId());
 view.writeLong(committable.checkpointId());
-view.writeInt(committable.manifest().length);
-view.write(committable.manifest());
+
+view.writeInt(committable.manifests().length);
+for (int i = 0; i < committable.manifests().length; i++) {
+  view.writeInt(committable.manifests()[i].length);
+  view.write(committable.manifests()[i]);
+}

Review Comment:
   I prefer this for readability:
   
   ```suggestion
   int numManifests = committable.manifests().length;
   view.writeInt(numManifests);
   for (int i = 0; i < numManifests; i++) {
 int manifest = committable.manifests()[i];
 view.writeInt(manifest.length);
 view.write(manifest);
   }
   ```



##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableKey.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.dynamic;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Objects;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+class TableKey implements Serializable {

Review Comment:
   ```suggestion
   class TableKey {
   ```
   
   This shouldn't be using Java serialization, as we have our own serialization 
logic in this class.



##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -126,9 +125,14 @@ public void 
commit(Collection> commitRequests)
   return;
 }
 
-// For every table and every checkpoint, we store the list of 
to-be-committed
-// DynamicCommittable.
-// There may be DynamicCommittable from previous checkpoints which have 
not been committed yet.
+/*
+  Each (table, branch, checkpoint) triplet must have only one commit 
request.
+  There may be commit requests from previous checkpoints which have not 
been committed yet.
+
+  We currently keep a List of commit requests per checkpoint instead of a 
single CommitRequest
+  to process the Flink state from previous releases, which had multiple 
commit requests due to a bug in the upstream
+  DynamicWriteResultAggregator. We should replace this with a single 
commit request in the next major release.

Review Comment:
   Is it strictly necessary to merge this change to 1.10.1? The original bug 
which motivated this change has already been fixed in 1.10.1. The changes here 
are great, but I think they can wait until 1.11.0.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-12-17 Thread via GitHub


aiborodin commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2625993848


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -210,36 +213,32 @@ private void commitPendingRequests(
 NavigableMap> pendingResults = Maps.newTreeMap();
 for (Map.Entry>> e : 
commitRequestMap.entrySet()) {
   for (CommitRequest committable : e.getValue()) {
-if (Arrays.equals(EMPTY_MANIFEST_DATA, 
committable.getCommittable().manifest())) {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-12-16 Thread via GitHub


pvary commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2622378706


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -210,36 +213,32 @@ private void commitPendingRequests(
 NavigableMap> pendingResults = Maps.newTreeMap();
 for (Map.Entry>> e : 
commitRequestMap.entrySet()) {
   for (CommitRequest committable : e.getValue()) {
-if (Arrays.equals(EMPTY_MANIFEST_DATA, 
committable.getCommittable().manifest())) {

Review Comment:
   If we remove `EMPTY_MANIFEST_DATA` handling (as it is not really used), then 
we should remove
   ```
 static final String MAX_CONTINUOUS_EMPTY_COMMITS = 
"flink.max-continuous-empty-commits";
   [..]
 private final Map maxContinuousEmptyCommitsMap;
 private final Map continuousEmptyCheckpointsMap;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-12-16 Thread via GitHub


pvary commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2622366781


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -57,11 +58,11 @@ class DynamicWriteResultAggregator
 implements OneInputStreamOperator<
 CommittableMessage, 
CommittableMessage> {
   private static final Logger LOG = 
LoggerFactory.getLogger(DynamicWriteResultAggregator.class);
-  private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
+  private static final byte[][] EMPTY_MANIFEST_DATA = new byte[0][];

Review Comment:
   Discussed it with @mxm, and we probably can remove the whole 
`EMPTY_MANIFEST_DATA` stuff from the `DynamicIcebergSink`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-12-15 Thread via GitHub


aiborodin commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2622049013


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -126,9 +119,15 @@ public void 
commit(Collection> commitRequests)
   return;
 }
 
-// For every table and every checkpoint, we store the list of 
to-be-committed
-// DynamicCommittable.
-// There may be DynamicCommittable from previous checkpoints which have 
not been committed yet.
+/*
+  Each (table, branch, checkpoint) triplet must have only one commit 
request.
+  There may be commit requests from previous checkpoints which have not 
been committed yet.
+
+  We currently keep a List of commit requests per checkpoint instead of a 
single CommitRequest
+  to process the Flink state from previous releases, which had multiple 
commit requests due to a bug in the
+  upstream DynamicWriteResultAggregator. Iceberg 1.11.0 will remove this, 
and users should upgrade to the latest

Review Comment:
   Acknowledged



##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -57,11 +58,11 @@ class DynamicWriteResultAggregator
 implements OneInputStreamOperator<
 CommittableMessage, 
CommittableMessage> {
   private static final Logger LOG = 
LoggerFactory.getLogger(DynamicWriteResultAggregator.class);
-  private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
+  private static final byte[][] EMPTY_MANIFEST_DATA = new byte[0][];

Review Comment:
   Good point. It appears that we don't need this because we use a Map in the 
dynamic sink aggregator, which can't have an empty list of `WriteResults`. I 
think it was ported from the `IcebergWriteAggregator`. I removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-12-15 Thread via GitHub


pvary commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2619051403


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -57,11 +58,11 @@ class DynamicWriteResultAggregator
 implements OneInputStreamOperator<
 CommittableMessage, 
CommittableMessage> {
   private static final Logger LOG = 
LoggerFactory.getLogger(DynamicWriteResultAggregator.class);
-  private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
+  private static final byte[][] EMPTY_MANIFEST_DATA = new byte[0][];

Review Comment:
   QQ: When can we have empty manifest data? Do we need it with the Dynamic 
Sink?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-12-15 Thread via GitHub


pvary commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2618965830


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -126,9 +119,15 @@ public void 
commit(Collection> commitRequests)
   return;
 }
 
-// For every table and every checkpoint, we store the list of 
to-be-committed
-// DynamicCommittable.
-// There may be DynamicCommittable from previous checkpoints which have 
not been committed yet.
+/*
+  Each (table, branch, checkpoint) triplet must have only one commit 
request.
+  There may be commit requests from previous checkpoints which have not 
been committed yet.
+
+  We currently keep a List of commit requests per checkpoint instead of a 
single CommitRequest
+  to process the Flink state from previous releases, which had multiple 
commit requests due to a bug in the
+  upstream DynamicWriteResultAggregator. Iceberg 1.11.0 will remove this, 
and users should upgrade to the latest

Review Comment:
   this is just a technicality, but the bug is not there.
   
   Maybe:
   ```
 to process the Flink state from previous releases, which had multiple 
commit requests in the
 upstream DynamicWriteResultAggregator.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-12-15 Thread via GitHub


pvary commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2618592470


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -126,9 +125,14 @@ public void 
commit(Collection> commitRequests)
   return;
 }
 
-// For every table and every checkpoint, we store the list of 
to-be-committed
-// DynamicCommittable.
-// There may be DynamicCommittable from previous checkpoints which have 
not been committed yet.
+/*
+  Each (table, branch, checkpoint) triplet must have only one commit 
request.
+  There may be commit requests from previous checkpoints which have not 
been committed yet.
+
+  We currently keep a List of commit requests per checkpoint instead of a 
single CommitRequest
+  to process the Flink state from previous releases, which had multiple 
commit requests due to a bug in the upstream
+  DynamicWriteResultAggregator. We should replace this with a single 
commit request in the next major release.

Review Comment:
   I agree, that we can discuss the removal separately. The comment contained 
the timeline, that is why I asked your thoughts here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-12-14 Thread via GitHub


aiborodin commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2618007875


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -126,9 +125,14 @@ public void 
commit(Collection> commitRequests)
   return;
 }
 
-// For every table and every checkpoint, we store the list of 
to-be-committed
-// DynamicCommittable.
-// There may be DynamicCommittable from previous checkpoints which have 
not been committed yet.
+/*
+  Each (table, branch, checkpoint) triplet must have only one commit 
request.
+  There may be commit requests from previous checkpoints which have not 
been committed yet.
+
+  We currently keep a List of commit requests per checkpoint instead of a 
single CommitRequest
+  to process the Flink state from previous releases, which had multiple 
commit requests due to a bug in the upstream
+  DynamicWriteResultAggregator. We should replace this with a single 
commit request in the next major release.

Review Comment:
   @pvary We can keep lists of committables for a longer period. Removing them 
from DynamicCommitter is a separate step, independent from this change. The 
current change is fully backward compatible.
   
   Although I'd prefer to remove lists of committables early, in the next minor 
release, 1.11, because it creates unnecessary complications and pollutes the 
code, making it harder to reason about.
   Additionally, we will still provide users with a smooth migration path. 
Users can upgrade to patch version 1.10.1 to resolve their current state and 
then upgrade to the latest minor version: 1.11+. We can always detect multiple 
committables in the `DynamicCommitter` state and ask users to upgrade to the 
previous patch version first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-12-14 Thread via GitHub


aiborodin commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2609629056


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -126,9 +125,14 @@ public void 
commit(Collection> commitRequests)
   return;
 }
 
-// For every table and every checkpoint, we store the list of 
to-be-committed
-// DynamicCommittable.
-// There may be DynamicCommittable from previous checkpoints which have 
not been committed yet.
+/*
+  Each (table, branch, checkpoint) triplet must have only one commit 
request.
+  There may be commit requests from previous checkpoints which have not 
been committed yet.
+
+  We currently keep a List of commit requests per checkpoint instead of a 
single CommitRequest
+  to process the Flink state from previous releases, which had multiple 
commit requests due to a bug in the upstream
+  DynamicWriteResultAggregator. We should replace this with a single 
commit request in the next major release.

Review Comment:
   I see the following timeline for the upgrade and the state migration:
   1. We release this fix in the next Iceberg patch version - 1.10.1. It allows 
users to pick up the new change quickly and migrate their state to a single 
committable per table/branch/checkpoint.
   2. We remove the support of lists in the next minor release: 1.11.0.
   
   This timeframe gives users enough time for a smooth migration. I updated the 
comment to reflect this.
   
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-12-12 Thread via GitHub


pvary commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2613353949


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -126,9 +125,14 @@ public void 
commit(Collection> commitRequests)
   return;
 }
 
-// For every table and every checkpoint, we store the list of 
to-be-committed
-// DynamicCommittable.
-// There may be DynamicCommittable from previous checkpoints which have 
not been committed yet.
+/*
+  Each (table, branch, checkpoint) triplet must have only one commit 
request.
+  There may be commit requests from previous checkpoints which have not 
been committed yet.
+
+  We currently keep a List of commit requests per checkpoint instead of a 
single CommitRequest
+  to process the Flink state from previous releases, which had multiple 
commit requests due to a bug in the upstream
+  DynamicWriteResultAggregator. We should replace this with a single 
commit request in the next major release.

Review Comment:
   With the old Sink we had the following guarantees:
   - You were able to upgrade between Iceberg minor versions without any issue
   - Flink version upgrade needed an application stop-start. In this case the 
committable state was cleaned (notifyCheckpointComplete was run), and these 
incompatibilities were not an issue.
   
   With the new SinkV2 API, I'm not sure we have any guarantees for cleaning up 
the committables from the state, but if this is so, then we might need to keep 
these things for a longer period.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-12-11 Thread via GitHub


aiborodin commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2612912900


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java:
##
@@ -75,18 +74,6 @@ Set equalityFields() {
 return equalityFields;
   }
 
-  void serializeTo(DataOutputView view) throws IOException {
-view.writeUTF(tableName);
-view.writeUTF(branch);
-view.writeInt(schemaId);
-view.writeInt(specId);
-view.writeBoolean(upsertMode);
-view.writeInt(equalityFields.size());
-for (Integer equalityField : equalityFields) {
-  view.writeInt(equalityField);
-}
-  }
-
   static WriteTarget deserializeFrom(DataInputView view) throws IOException {

Review Comment:
   We can still keep this to support upgrading simple cases directly into the 
new major version 1.11+ (where there's a single V1 committable per table, 
branch, checkpoint in the Flink state). It would work even after we remove 
lists of committables in DynamicCommitter.
   Other use cases would upgrade to the new minor version first (1.10.1) to fix 
their state. We can also throw an exception in the DynamicCommitter if we 
detect this and ask users to upgrade to 1.10.1 first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-12-11 Thread via GitHub


aiborodin commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2612912900


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java:
##
@@ -75,18 +74,6 @@ Set equalityFields() {
 return equalityFields;
   }
 
-  void serializeTo(DataOutputView view) throws IOException {
-view.writeUTF(tableName);
-view.writeUTF(branch);
-view.writeInt(schemaId);
-view.writeInt(specId);
-view.writeBoolean(upsertMode);
-view.writeInt(equalityFields.size());
-for (Integer equalityField : equalityFields) {
-  view.writeInt(equalityField);
-}
-  }
-
   static WriteTarget deserializeFrom(DataInputView view) throws IOException {

Review Comment:
   We can still keep this to support upgrading simple use cases where there's a 
single V1 committable per table, branch, checkpoint directly into the latest 
version (1.11+). It would work even after we remove lists of committables in 
DynamicCommitter. Other use cases would upgrade to the new minor version first 
(1.10.1) to fix their state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-12-11 Thread via GitHub


aiborodin commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2612912900


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java:
##
@@ -75,18 +74,6 @@ Set equalityFields() {
 return equalityFields;
   }
 
-  void serializeTo(DataOutputView view) throws IOException {
-view.writeUTF(tableName);
-view.writeUTF(branch);
-view.writeInt(schemaId);
-view.writeInt(specId);
-view.writeBoolean(upsertMode);
-view.writeInt(equalityFields.size());
-for (Integer equalityField : equalityFields) {
-  view.writeInt(equalityField);
-}
-  }
-
   static WriteTarget deserializeFrom(DataInputView view) throws IOException {

Review Comment:
   We can still keep this to support upgrading simple use cases where there's a 
single V1 committable per table, branch, checkpoint directly into the new major 
version (1.11+). It would work even after we remove lists of committables in 
DynamicCommitter. Other use cases would upgrade to the new minor version first 
(1.10.1) to fix their state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-12-11 Thread via GitHub


pvary commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2611439580


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java:
##
@@ -75,18 +74,6 @@ Set equalityFields() {
 return equalityFields;
   }
 
-  void serializeTo(DataOutputView view) throws IOException {
-view.writeUTF(tableName);
-view.writeUTF(branch);
-view.writeInt(schemaId);
-view.writeInt(specId);
-view.writeBoolean(upsertMode);
-view.writeInt(equalityFields.size());
-for (Integer equalityField : equalityFields) {
-  view.writeInt(equalityField);
-}
-  }
-
   static WriteTarget deserializeFrom(DataInputView view) throws IOException {

Review Comment:
   So we can deprecate this too, and it will be removed as soon as we don't 
have any only DynamicCommittable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-12-11 Thread via GitHub


aiborodin commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2609631494


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java:
##
@@ -75,18 +74,6 @@ Set equalityFields() {
 return equalityFields;
   }
 
-  void serializeTo(DataOutputView view) throws IOException {
-view.writeUTF(tableName);
-view.writeUTF(branch);
-view.writeInt(schemaId);
-view.writeInt(specId);
-view.writeBoolean(upsertMode);
-view.writeInt(equalityFields.size());
-for (Integer equalityField : equalityFields) {
-  view.writeInt(equalityField);
-}
-  }
-

Review Comment:
   We don't use this in production because we don't serialise the WriteTarget. 
We still need the deserialise method, though, to read V1 committables.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-12-11 Thread via GitHub


aiborodin commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2609630549


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultSerializer.java:
##
@@ -41,6 +41,7 @@ public byte[] serialize(DynamicWriteResult writeResult) 
throws IOException {
 ByteArrayOutputStream out = new ByteArrayOutputStream();
 DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out);
 writeResult.key().serializeTo(view);
+view.writeInt(writeResult.specId());

Review Comment:
   WriteResults are serialized only at runtime for communication between 
writers and the aggregator, so this will work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-12-11 Thread via GitHub


aiborodin commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2609629056


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -126,9 +125,14 @@ public void 
commit(Collection> commitRequests)
   return;
 }
 
-// For every table and every checkpoint, we store the list of 
to-be-committed
-// DynamicCommittable.
-// There may be DynamicCommittable from previous checkpoints which have 
not been committed yet.
+/*
+  Each (table, branch, checkpoint) triplet must have only one commit 
request.
+  There may be commit requests from previous checkpoints which have not 
been committed yet.
+
+  We currently keep a List of commit requests per checkpoint instead of a 
single CommitRequest
+  to process the Flink state from previous releases, which had multiple 
commit requests due to a bug in the upstream
+  DynamicWriteResultAggregator. We should replace this with a single 
commit request in the next major release.

Review Comment:
   I see the following timeline for the upgrade and the state migration:
   1. We release this fix in the next minor Iceberg version - 1.10.1. It allows 
users to pick up the new change quickly and migrate their state to a single 
committable per table/branch/checkpoint.
   2. We remove the support of lists in the next major release: 1.11.0.
   
   This timeframe gives users enough time for a smooth migration. I updated the 
comment to reflect this.
   
   What do you think?



##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -210,36 +218,40 @@ private void commitPendingRequests(
 NavigableMap> pendingResults = Maps.newTreeMap();
 for (Map.Entry>> e : 
commitRequestMap.entrySet()) {
   for (CommitRequest committable : e.getValue()) {
-if (Arrays.equals(EMPTY_MANIFEST_DATA, 
committable.getCommittable().manifest())) {
+if (Arrays.deepEquals(EMPTY_MANIFEST_DATA, 
committable.getCommittable().manifests())) {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-12-10 Thread via GitHub


pvary commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2606098030


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java:
##
@@ -75,18 +74,6 @@ Set equalityFields() {
 return equalityFields;
   }
 
-  void serializeTo(DataOutputView view) throws IOException {
-view.writeUTF(tableName);
-view.writeUTF(branch);
-view.writeInt(schemaId);
-view.writeInt(specId);
-view.writeBoolean(upsertMode);
-view.writeInt(equalityFields.size());
-for (Integer equalityField : equalityFields) {
-  view.writeInt(equalityField);
-}
-  }
-

Review Comment:
   Why do we remove this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-12-10 Thread via GitHub


pvary commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2606087020


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultSerializer.java:
##
@@ -41,6 +41,7 @@ public byte[] serialize(DynamicWriteResult writeResult) 
throws IOException {
 ByteArrayOutputStream out = new ByteArrayOutputStream();
 DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out);
 writeResult.key().serializeTo(view);
+view.writeInt(writeResult.specId());

Review Comment:
   Do we need to have a new version? WriteResults serialized with the old 
version will not be readable with the new serializer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-12-10 Thread via GitHub


pvary commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2606068307


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -210,36 +218,40 @@ private void commitPendingRequests(
 NavigableMap> pendingResults = Maps.newTreeMap();
 for (Map.Entry>> e : 
commitRequestMap.entrySet()) {
   for (CommitRequest committable : e.getValue()) {
-if (Arrays.equals(EMPTY_MANIFEST_DATA, 
committable.getCommittable().manifest())) {
+if (Arrays.deepEquals(EMPTY_MANIFEST_DATA, 
committable.getCommittable().manifests())) {

Review Comment:
   Could we just check for `committable.getCommittable().manifests().isEmpty()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-12-10 Thread via GitHub


pvary commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2606065163


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -210,36 +218,40 @@ private void commitPendingRequests(
 NavigableMap> pendingResults = Maps.newTreeMap();
 for (Map.Entry>> e : 
commitRequestMap.entrySet()) {
   for (CommitRequest committable : e.getValue()) {
-if (Arrays.equals(EMPTY_MANIFEST_DATA, 
committable.getCommittable().manifest())) {
+if (Arrays.deepEquals(EMPTY_MANIFEST_DATA, 
committable.getCommittable().manifests())) {

Review Comment:
   Maybe, it is ok.
   NVM



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-12-10 Thread via GitHub


pvary commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2606020361


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -210,36 +218,40 @@ private void commitPendingRequests(
 NavigableMap> pendingResults = Maps.newTreeMap();
 for (Map.Entry>> e : 
commitRequestMap.entrySet()) {
   for (CommitRequest committable : e.getValue()) {
-if (Arrays.equals(EMPTY_MANIFEST_DATA, 
committable.getCommittable().manifest())) {
+if (Arrays.deepEquals(EMPTY_MANIFEST_DATA, 
committable.getCommittable().manifests())) {

Review Comment:
   This logic is not good in this way. We need to combine it to the `manifests` 
loop below.
   
   The end goal what we want to archive:
   - Every manifest should be in the `pendingResults` which is not 
`EMPTY_MANIFEST_DATA`
   - If the `pendingResults` is empty, then put a single `EMPTY_MANIFEST_DATA` 
into it
   
   We shouldn't put `EMPTY_MANIFEST_DATA` to the `pendingResults`. And but put



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-12-10 Thread via GitHub


pvary commented on code in PR #14810:
URL: https://github.com/apache/iceberg/pull/14810#discussion_r2605953980


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -126,9 +125,14 @@ public void 
commit(Collection> commitRequests)
   return;
 }
 
-// For every table and every checkpoint, we store the list of 
to-be-committed
-// DynamicCommittable.
-// There may be DynamicCommittable from previous checkpoints which have 
not been committed yet.
+/*
+  Each (table, branch, checkpoint) triplet must have only one commit 
request.
+  There may be commit requests from previous checkpoints which have not 
been committed yet.
+
+  We currently keep a List of commit requests per checkpoint instead of a 
single CommitRequest
+  to process the Flink state from previous releases, which had multiple 
commit requests due to a bug in the upstream
+  DynamicWriteResultAggregator. We should replace this with a single 
commit request in the next major release.

Review Comment:
   Let's talk about @mxm and @Guosmilesmile about the timeline here.
   It would be good to have a specific version mentioned here about the removal 
of this extra list. When could we say that the state will not contain any old 
`DynamicCommittable` objects, and how could the users ensure that everything is 
ready for an upgrade.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-12-10 Thread via GitHub


aiborodin commented on PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#issuecomment-3636166241

   @pvary I re-opened this PR in https://github.com/apache/iceberg/pull/14810.
   Could you please take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-12-10 Thread via GitHub


aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2605873412


##
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommittableSerializer.java:
##
@@ -24,38 +24,28 @@
 import java.io.IOException;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.junit.jupiter.api.Test;
 
 class TestDynamicCommittableSerializer {
+  private static final DynamicCommittable COMMITTABLE =
+  new DynamicCommittable(
+  new TableKey("table", "branch"),
+  new byte[][] {{3, 4}},
+  JobID.generate().toHexString(),
+  new OperatorID().toHexString(),
+  5);
 
   @Test
   void testRoundtrip() throws IOException {

Review Comment:
   I added tests for V1 ser/se.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-12-10 Thread via GitHub


aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2605872093


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -200,27 +198,27 @@ private void commitPendingRequests(
   throws IOException {
 long checkpointId = commitRequestMap.lastKey();
 List manifests = Lists.newArrayList();
-NavigableMap> pendingResults = Maps.newTreeMap();
+NavigableMap pendingResults = Maps.newTreeMap();
 for (Map.Entry>> e : 
commitRequestMap.entrySet()) {
+  WriteResult.Builder builder = WriteResult.builder();
   for (CommitRequest committable : e.getValue()) {
-if (Arrays.equals(EMPTY_MANIFEST_DATA, 
committable.getCommittable().manifest())) {
-  pendingResults
-  .computeIfAbsent(e.getKey(), unused -> Lists.newArrayList())
-  .add(EMPTY_WRITE_RESULT);
-} else {
-  DeltaManifests deltaManifests =
-  SimpleVersionedSerialization.readVersionAndDeSerialize(
-  DeltaManifestsSerializer.INSTANCE, 
committable.getCommittable().manifest());
-  pendingResults
-  .computeIfAbsent(e.getKey(), unused -> Lists.newArrayList())
-  .add(FlinkManifestUtil.readCompletedFiles(deltaManifests, 
table.io(), table.specs()));
-  manifests.addAll(deltaManifests.manifests());
+if (!Arrays.deepEquals(EMPTY_MANIFEST_DATA, 
committable.getCommittable().manifests())) {
+  for (byte[] manifest : committable.getCommittable().manifests()) {
+DeltaManifests deltaManifests =
+SimpleVersionedSerialization.readVersionAndDeSerialize(
+DeltaManifestsSerializer.INSTANCE, manifest);
+builder.add(
+FlinkManifestUtil.readCompletedFiles(deltaManifests, 
table.io(), table.specs()));
+manifests.addAll(deltaManifests.manifests());
+  }
 }
+
+pendingResults.put(e.getKey(), builder.build());
   }
 }
 
-CommitSummary summary = new CommitSummary();
-summary.addAll(pendingResults);
+// TODO: Fix aggregated commit summary logged multiple times per each each 
checkpoint commit

Review Comment:
   This is a separate issue, unrelated to this change. 
   The committer logs the same `CommitSummary` across multiple commits for 
different checkpoints.



##
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java:
##
@@ -423,6 +426,26 @@ void testPartitionSpecEvolution() throws Exception {
 new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", 
spec2));
 
 runTest(rows);
+
+// Validate the table has expected partition specs
+Table table = 
CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of(DATABASE, "t1"));
+
+Map tableSpecs = table.specs();
+List expectedSpecs = List.of(spec1, spec2, 
PartitionSpec.unpartitioned());
+
+assertThat(tableSpecs).hasSize(expectedSpecs.size());
+expectedSpecs.forEach(
+expectedSpec ->
+assertThat(
+tableSpecs.values().stream()
+// TODO: Fix PartitionSpecEvolution#evolve to re-use 
PartitionField names of

Review Comment:
   I expect it to be a relatively simple change. However, it seems worth a 
separate PR because it doesn't directly relate to the scope of this change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-12-10 Thread via GitHub


aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2605871302


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -119,6 +110,11 @@ public void 
commit(Collection> commitRequests)
   return;
 }
 
+/*
+ TODO: Replace List> with a single 
CommitRequest

Review Comment:
   I updated the comment.
   
   > We could still have multiple DynamicCommittable for a checkpoint - when we 
have incoming data for multiple branches. So we still need to have a list. Am I 
right?
   
   Each branch would have a separate incoming commit request because commit 
requests are grouped by `TableKey` (table and branch). There must be only one 
commit request per TableKey (table and branch).



##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -155,6 +152,7 @@ public void 
commit(Collection> commitRequests)
 }
   }
 
+  /** TODO: Reuse {@link 
org.apache.iceberg.flink.sink.SinkUtil#getMaxCommittedCheckpointId} * */

Review Comment:
   I will remove this duplication when porting 
https://github.com/apache/iceberg/pull/14517 to the regular `IcebergSink`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-12-10 Thread via GitHub


aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2605870647


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -141,6 +137,7 @@ public void 
commit(Collection> commitRequests)
   getMaxCommittedCheckpointId(
   table, last.jobId(), last.operatorId(), entry.getKey().branch());
   // Mark the already committed FilesCommittable(s) as finished
+  // TODO: Add more logging for visibility on skipped commit requests

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-11-30 Thread via GitHub


github-actions[bot] commented on PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#issuecomment-3594007953

   This pull request has been closed due to lack of activity. This is not a 
judgement on the merit of the PR in any way. It is just a way of keeping the PR 
queue manageable. If you think that is incorrect, or the pull request requires 
review, you can revive the PR at any time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-11-30 Thread via GitHub


github-actions[bot] closed pull request #14312: Flink: Refactor WriteResult 
aggregation in DynamicIcebergSink
URL: https://github.com/apache/iceberg/pull/14312


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-11-22 Thread via GitHub


github-actions[bot] commented on PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#issuecomment-3567193874

   This pull request has been marked as stale due to 30 days of inactivity. It 
will be closed in 1 week if no further activity occurs. If you think that’s 
incorrect or this pull request requires a review, please simply write any 
comment. If closed, you can revive the PR at any time and @mention a reviewer 
or discuss it on the [email protected] list. Thank you for your 
contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-10-23 Thread via GitHub


pvary commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2455077190


##
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommittableSerializer.java:
##
@@ -24,38 +24,28 @@
 import java.io.IOException;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.junit.jupiter.api.Test;
 
 class TestDynamicCommittableSerializer {
+  private static final DynamicCommittable COMMITTABLE =
+  new DynamicCommittable(
+  new TableKey("table", "branch"),
+  new byte[][] {{3, 4}},
+  JobID.generate().toHexString(),
+  new OperatorID().toHexString(),
+  5);
 
   @Test
   void testRoundtrip() throws IOException {

Review Comment:
   How hard would it be to test that the old commitables are deserialized 
correctly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-10-23 Thread via GitHub


pvary commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2455069904


##
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java:
##
@@ -423,6 +426,26 @@ void testPartitionSpecEvolution() throws Exception {
 new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", 
spec2));
 
 runTest(rows);
+
+// Validate the table has expected partition specs
+Table table = 
CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of(DATABASE, "t1"));
+
+Map tableSpecs = table.specs();
+List expectedSpecs = List.of(spec1, spec2, 
PartitionSpec.unpartitioned());
+
+assertThat(tableSpecs).hasSize(expectedSpecs.size());
+expectedSpecs.forEach(
+expectedSpec ->
+assertThat(
+tableSpecs.values().stream()
+// TODO: Fix PartitionSpecEvolution#evolve to re-use 
PartitionField names of

Review Comment:
   How complicated this change would be in the `evolve`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-10-23 Thread via GitHub


pvary commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2455061714


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId) 
throws IOException {
   }
 
   /**
-   * Write all the completed data files to a newly created manifest file and 
return the manifest's
+   * Write all the completed data files to a newly created manifest files and 
return the manifests'
* avro serialized bytes.
*/
   @VisibleForTesting
-  byte[] writeToManifest(
-  WriteTarget key, Collection writeResults, long 
checkpointId)
+  byte[][] writeToManifests(
+  String tableName, Collection writeResults, long 
checkpointId)

Review Comment:
   Thanks!
   Let me take a look



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-10-23 Thread via GitHub


pvary commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2455056623


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -200,27 +198,27 @@ private void commitPendingRequests(
   throws IOException {
 long checkpointId = commitRequestMap.lastKey();
 List manifests = Lists.newArrayList();
-NavigableMap> pendingResults = Maps.newTreeMap();
+NavigableMap pendingResults = Maps.newTreeMap();
 for (Map.Entry>> e : 
commitRequestMap.entrySet()) {
+  WriteResult.Builder builder = WriteResult.builder();
   for (CommitRequest committable : e.getValue()) {
-if (Arrays.equals(EMPTY_MANIFEST_DATA, 
committable.getCommittable().manifest())) {
-  pendingResults
-  .computeIfAbsent(e.getKey(), unused -> Lists.newArrayList())
-  .add(EMPTY_WRITE_RESULT);
-} else {
-  DeltaManifests deltaManifests =
-  SimpleVersionedSerialization.readVersionAndDeSerialize(
-  DeltaManifestsSerializer.INSTANCE, 
committable.getCommittable().manifest());
-  pendingResults
-  .computeIfAbsent(e.getKey(), unused -> Lists.newArrayList())
-  .add(FlinkManifestUtil.readCompletedFiles(deltaManifests, 
table.io(), table.specs()));
-  manifests.addAll(deltaManifests.manifests());
+if (!Arrays.deepEquals(EMPTY_MANIFEST_DATA, 
committable.getCommittable().manifests())) {
+  for (byte[] manifest : committable.getCommittable().manifests()) {
+DeltaManifests deltaManifests =
+SimpleVersionedSerialization.readVersionAndDeSerialize(
+DeltaManifestsSerializer.INSTANCE, manifest);
+builder.add(
+FlinkManifestUtil.readCompletedFiles(deltaManifests, 
table.io(), table.specs()));
+manifests.addAll(deltaManifests.manifests());
+  }
 }
+
+pendingResults.put(e.getKey(), builder.build());
   }
 }
 
-CommitSummary summary = new CommitSummary();
-summary.addAll(pendingResults);
+// TODO: Fix aggregated commit summary logged multiple times per each each 
checkpoint commit

Review Comment:
   I don't get this comment. Could you please elaborate?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-10-23 Thread via GitHub


pvary commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2455035563


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -155,6 +152,7 @@ public void 
commit(Collection> commitRequests)
 }
   }
 
+  /** TODO: Reuse {@link 
org.apache.iceberg.flink.sink.SinkUtil#getMaxCommittedCheckpointId} * */

Review Comment:
   Why is this for the future?
   
   This code is a duplication which we should remove here, or maybe even in a 
different commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-10-23 Thread via GitHub


pvary commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2455028357


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -119,6 +110,11 @@ public void 
commit(Collection> commitRequests)
   return;
 }
 
+/*
+ TODO: Replace List> with a single 
CommitRequest

Review Comment:
   We could still have multiple DynamicCommittable for a checkpoint - when we 
have incoming data for multiple branches. So we still need to have a list. Am I 
right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-10-23 Thread via GitHub


pvary commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2455011962


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -119,6 +110,11 @@ public void 
commit(Collection> commitRequests)
   return;
 }
 
+/*
+ TODO: Replace List> with a single 
CommitRequest

Review Comment:
   Is this because we still need to be prepared for the old state, where we 
have a List of committables?
   
   Could we add this to the comment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-10-23 Thread via GitHub


pvary commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2455016399


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -155,6 +152,7 @@ public void 
commit(Collection> commitRequests)
 }
   }
 
+  /** TODO: Reuse {@link 
org.apache.iceberg.flink.sink.SinkUtil#getMaxCommittedCheckpointId} * */

Review Comment:
   This is for the future when the state only contains committables serialized 
by the new Aggregator?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-10-23 Thread via GitHub


pvary commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2455008466


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##
@@ -141,6 +137,7 @@ public void 
commit(Collection> commitRequests)
   getMaxCommittedCheckpointId(
   table, last.jobId(), last.operatorId(), entry.getKey().branch());
   // Mark the already committed FilesCommittable(s) as finished
+  // TODO: Add more logging for visibility on skipped commit requests

Review Comment:
   Could we fix this TODO?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-10-23 Thread via GitHub


aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2454238360


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId) 
throws IOException {
   }
 
   /**
-   * Write all the completed data files to a newly created manifest file and 
return the manifest's
+   * Write all the completed data files to a newly created manifest files and 
return the manifests'
* avro serialized bytes.
*/
   @VisibleForTesting
-  byte[] writeToManifest(
-  WriteTarget key, Collection writeResults, long 
checkpointId)
+  byte[][] writeToManifests(
+  String tableName, Collection writeResults, long 
checkpointId)

Review Comment:
   @pvary I rebased this PR on top of the latest changes, so it should be good 
to review.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-10-21 Thread via GitHub


aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2450011710


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId) 
throws IOException {
   }
 
   /**
-   * Write all the completed data files to a newly created manifest file and 
return the manifest's
+   * Write all the completed data files to a newly created manifest files and 
return the manifests'
* avro serialized bytes.
*/
   @VisibleForTesting
-  byte[] writeToManifest(
-  WriteTarget key, Collection writeResults, long 
checkpointId)
+  byte[][] writeToManifests(
+  String tableName, Collection writeResults, long 
checkpointId)

Review Comment:
   > Is there anything which would prevent you to use the DynamicSink in 1.10.1?
   
   Not really, just highlighting that we can easily add this to 1.10.1 as well. 
I am also okay with this being included in 1.11 if that's the established 
practice.
   
   Looking forward to merging this change =)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-10-21 Thread via GitHub


aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2450011710


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId) 
throws IOException {
   }
 
   /**
-   * Write all the completed data files to a newly created manifest file and 
return the manifest's
+   * Write all the completed data files to a newly created manifest files and 
return the manifests'
* avro serialized bytes.
*/
   @VisibleForTesting
-  byte[] writeToManifest(
-  WriteTarget key, Collection writeResults, long 
checkpointId)
+  byte[][] writeToManifests(
+  String tableName, Collection writeResults, long 
checkpointId)

Review Comment:
   > Is there anything which would prevent you to use the DynamicSink in 1.10.1?
   
   Not really, just highlighting that we can easily add this to 1.10.1 as well. 
I am also okay with this being included in 1.11 if that's the established 
practice.
   Looking forward to merging this change =)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-10-21 Thread via GitHub


aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2450011710


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId) 
throws IOException {
   }
 
   /**
-   * Write all the completed data files to a newly created manifest file and 
return the manifest's
+   * Write all the completed data files to a newly created manifest files and 
return the manifests'
* avro serialized bytes.
*/
   @VisibleForTesting
-  byte[] writeToManifest(
-  WriteTarget key, Collection writeResults, long 
checkpointId)
+  byte[][] writeToManifests(
+  String tableName, Collection writeResults, long 
checkpointId)

Review Comment:
   > Is there anything which would prevent you to use the DynamicSink in 1.10.1?
   
   Not really, just highlighting that we can easily add this to 1.10.1 as well. 
I am also okay with this being included in 1.11 if that's the established 
practice.
   Looking forward to merging this =)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-10-21 Thread via GitHub


pvary commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2447236047


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId) 
throws IOException {
   }
 
   /**
-   * Write all the completed data files to a newly created manifest file and 
return the manifest's
+   * Write all the completed data files to a newly created manifest files and 
return the manifests'
* avro serialized bytes.
*/
   @VisibleForTesting
-  byte[] writeToManifest(
-  WriteTarget key, Collection writeResults, long 
checkpointId)
+  byte[][] writeToManifests(
+  String tableName, Collection writeResults, long 
checkpointId)

Review Comment:
   The patch releases are usually for bugfixes. 
   
   #14182 is already scheduled for 1.10.1, so that correctness part will be 
fixed.
   If I understand correctly this PR is optimization at this point, but I might 
be wrong. Is there anything which would prevent you to use the DynamicSink in 
1.10.1? 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-10-21 Thread via GitHub


aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2447130079


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId) 
throws IOException {
   }
 
   /**
-   * Write all the completed data files to a newly created manifest file and 
return the manifest's
+   * Write all the completed data files to a newly created manifest files and 
return the manifests'
* avro serialized bytes.
*/
   @VisibleForTesting
-  byte[] writeToManifest(
-  WriteTarget key, Collection writeResults, long 
checkpointId)
+  byte[][] writeToManifests(
+  String tableName, Collection writeResults, long 
checkpointId)

Review Comment:
   > This change will be released in a few months. So other users might have to 
wait to get your feature. 
   
   @pvary @Guosmilesmile We can now include this change in Iceberg 1.10.1 
because it supports previous serialised states and is backward compatible. So 
users will be able to upgrade seamlessly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-10-20 Thread via GitHub


Guosmilesmile commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2444826438


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId) 
throws IOException {
   }
 
   /**
-   * Write all the completed data files to a newly created manifest file and 
return the manifest's
+   * Write all the completed data files to a newly created manifest files and 
return the manifests'
* avro serialized bytes.
*/
   @VisibleForTesting
-  byte[] writeToManifest(
-  WriteTarget key, Collection writeResults, long 
checkpointId)
+  byte[][] writeToManifests(
+  String tableName, Collection writeResults, long 
checkpointId)

Review Comment:
   Personally, a seamless upgrade feels like a big win to me (as long as it 
doesn’t require a huge engineering effort). 
   When users have to perform extra steps during an upgrade, it’s confusing—and 
even scary—because they can’t tell whether the extra work is caused by this 
feature or by something else entirely. If we can make the upgrade invisible, we 
spare them that confusion and fear, which is awesome. 
   So IMHO it’s worth the effort.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-10-20 Thread via GitHub


pvary commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2444757010


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId) 
throws IOException {
   }
 
   /**
-   * Write all the completed data files to a newly created manifest file and 
return the manifest's
+   * Write all the completed data files to a newly created manifest files and 
return the manifests'
* avro serialized bytes.
*/
   @VisibleForTesting
-  byte[] writeToManifest(
-  WriteTarget key, Collection writeResults, long 
checkpointId)
+  byte[][] writeToManifests(
+  String tableName, Collection writeResults, long 
checkpointId)

Review Comment:
   This change will be released in a few months. So other users might have to 
wait to get your feature. I think this could mean we should try to serve their 
needs and allow path for easy upgrade. Adding a new serializer is not a big 
issue here, so IMHO it would worth the effort.
   But that's just me, let's see what others think.
   CC: @mxm, @Guosmilesmile
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-10-20 Thread via GitHub


aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2444125269


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId) 
throws IOException {
   }
 
   /**
-   * Write all the completed data files to a newly created manifest file and 
return the manifest's
+   * Write all the completed data files to a newly created manifest files and 
return the manifests'
* avro serialized bytes.
*/
   @VisibleForTesting
-  byte[] writeToManifest(
-  WriteTarget key, Collection writeResults, long 
checkpointId)
+  byte[][] writeToManifests(
+  String tableName, Collection writeResults, long 
checkpointId)

Review Comment:
   > I'm leaning towards option 1, because I'm a bit skeptical about other 
serialization methods, and I think we will need longer time to agree on a way 
to move forward.
   
   Sure, I added a new version and a method to deserialize the previous 
committable version to achieve this.
   
   > One argument against it is that the multiple manifest serialization 
doesn't add too much performance gain for us.
   
   We get performance gain from this change by not having to write a new 
manifest for each unique `schemaId`, `upsertMode`, `equalityFields` in the 
`WriteTarget`, but only for unique `specIds`.
   
   > In the past, we have always made sure, that users can upgrade their job to 
a newer Iceberg version without dropping the state. This is important for long 
running jobs, where in-place upgrade is critical.
   
   In my opinion, this approach makes sense for battle-tested and 
well-established APIs. The `DynamicIcebergSink` API was only released a few 
weeks ago and would not have as many users relying on it. I think quick 
iteration and resolution of issues are more beneficial at the early stages of 
this code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-10-20 Thread via GitHub


aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2444125269


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId) 
throws IOException {
   }
 
   /**
-   * Write all the completed data files to a newly created manifest file and 
return the manifest's
+   * Write all the completed data files to a newly created manifest files and 
return the manifests'
* avro serialized bytes.
*/
   @VisibleForTesting
-  byte[] writeToManifest(
-  WriteTarget key, Collection writeResults, long 
checkpointId)
+  byte[][] writeToManifests(
+  String tableName, Collection writeResults, long 
checkpointId)

Review Comment:
   > I'm leaning towards option 1, because I'm a bit skeptical about other 
serialization methods, and I think we will need longer time to agree on a way 
to move forward.
   
   Sure, I added a new version and a method to deserialize the previous 
committable version.
   
   > One argument against it is that the multiple manifest serialization 
doesn't add too much performance gain for us.
   
   We get performance gain from this change by not having to write a new 
manifest for each unique `schemaId`, `upsertMode`, `equalityFields` in the 
`WriteTarget`, but only for unique `specIds`.
   
   > In the past, we have always made sure, that users can upgrade their job to 
a newer Iceberg version without dropping the state. This is important for long 
running jobs, where in-place upgrade is critical.
   
   In my opinion, this approach makes sense for battle-tested and 
well-established APIs. The `DynamicIcebergSink` API was only released a few 
weeks ago and would not have as many users relying on it. I think quick 
iteration and resolution of issues are more beneficial at the early stages of 
this code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-10-18 Thread via GitHub


pvary commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2428698701


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId) 
throws IOException {
   }
 
   /**
-   * Write all the completed data files to a newly created manifest file and 
return the manifest's
+   * Write all the completed data files to a newly created manifest files and 
return the manifests'
* avro serialized bytes.
*/
   @VisibleForTesting
-  byte[] writeToManifest(
-  WriteTarget key, Collection writeResults, long 
checkpointId)
+  byte[][] writeToManifests(
+  String tableName, Collection writeResults, long 
checkpointId)

Review Comment:
   Will take a look soon.
   
   QQ in the meantime: How do we handle Iceberg version change for long running 
jobs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-10-18 Thread via GitHub


aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2427677246


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId) 
throws IOException {
   }
 
   /**
-   * Write all the completed data files to a newly created manifest file and 
return the manifest's
+   * Write all the completed data files to a newly created manifest files and 
return the manifests'
* avro serialized bytes.
*/
   @VisibleForTesting
-  byte[] writeToManifest(
-  WriteTarget key, Collection writeResults, long 
checkpointId)
+  byte[][] writeToManifests(
+  String tableName, Collection writeResults, long 
checkpointId)

Review Comment:
   Yes, a manifest can only contain files for a single partition spec.
   Yes, the `RowDelta` writes multiple manifests behind the scenes. It keeps 
track of `DataFile` and `DeleteFile` by their partition specs. See the 
implementation of 
[MergingSnapshotProducer](https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java#L87):
   ```java
 // update data
 private final Map newDataFilesBySpec = 
Maps.newHashMap();
 private Long newDataFilesDataSequenceNumber;
 private final Map newDeleteFilesBySpec = 
Maps.newHashMap();
   ```
   
   The current implementation implicitly writes a new temporary manifest for 
each unique `WriteTarget`, which creates multiple `DynamicCommittables` per 
(table, branch, checkpoint) triplet (incorrect behaviour). See the 
implementation of 
[DynamicWriteResultAggregator](https://github.com/apache/iceberg/blob/main/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java#L101).
   
   With this refactor, we write even fewer manifests (only for unique partition 
specs), which makes the implementation explicit:
   1. Create multiple manifests only for different partition specs (similar to 
`RowDelta`)
   2. Create only one `DynamicCommittable` per checkpoint, and use multiple 
manifests for serialisation
   3. Remove all assumptions of multiple commit requests in `DynamicCommitter`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-10-18 Thread via GitHub


pvary commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2435490550


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId) 
throws IOException {
   }
 
   /**
-   * Write all the completed data files to a newly created manifest file and 
return the manifest's
+   * Write all the completed data files to a newly created manifest files and 
return the manifests'
* avro serialized bytes.
*/
   @VisibleForTesting
-  byte[] writeToManifest(
-  WriteTarget key, Collection writeResults, long 
checkpointId)
+  byte[][] writeToManifests(
+  String tableName, Collection writeResults, long 
checkpointId)

Review Comment:
   I need some time to think through the implications, and how we handled these 
situations in the past. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-10-18 Thread via GitHub


aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2426103810


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId) 
throws IOException {
   }
 
   /**
-   * Write all the completed data files to a newly created manifest file and 
return the manifest's
+   * Write all the completed data files to a newly created manifest files and 
return the manifests'
* avro serialized bytes.
*/
   @VisibleForTesting
-  byte[] writeToManifest(
-  WriteTarget key, Collection writeResults, long 
checkpointId)
+  byte[][] writeToManifests(
+  String tableName, Collection writeResults, long 
checkpointId)

Review Comment:
   We need to create a separate manifest file for each partition spec according 
to the [Iceberg spec](https://iceberg.apache.org/spec/#manifests):
   
   > A manifest stores files for a single partition spec. When a table’s 
partition spec changes, old files remain in the older manifest and newer files 
are written to a new manifest. This is required because a manifest file’s 
schema is based on its partition spec (see below).
   
   I attempted to hack this and write a single `ManifestFile` with multiple 
`DataFiles`/`DeleteFiles` using different partition specs. This approach 
resulted in incorrect partition specs returned when reading the manifest back 
using the `ManifestReader`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-10-18 Thread via GitHub


aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2434774536


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId) 
throws IOException {
   }
 
   /**
-   * Write all the completed data files to a newly created manifest file and 
return the manifest's
+   * Write all the completed data files to a newly created manifest files and 
return the manifests'
* avro serialized bytes.
*/
   @VisibleForTesting
-  byte[] writeToManifest(
-  WriteTarget key, Collection writeResults, long 
checkpointId)
+  byte[][] writeToManifests(
+  String tableName, Collection writeResults, long 
checkpointId)

Review Comment:
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-10-18 Thread via GitHub


pvary commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2431136207


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId) 
throws IOException {
   }
 
   /**
-   * Write all the completed data files to a newly created manifest file and 
return the manifest's
+   * Write all the completed data files to a newly created manifest files and 
return the manifests'
* avro serialized bytes.
*/
   @VisibleForTesting
-  byte[] writeToManifest(
-  WriteTarget key, Collection writeResults, long 
checkpointId)
+  byte[][] writeToManifests(
+  String tableName, Collection writeResults, long 
checkpointId)

Review Comment:
   > Both the current code and the refactor use ManifestReader/Writer with a 
hard-coded version 2 of the Iceberg format for committable state serialisation 
in a checkpoint. See 
[FlinkManifestUtil](https://github.com/apache/iceberg/blob/main/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java#L44).
   
   But we change the DynamicCommittable to contain TableKey instead of 
WriteTarget and a byte[][] instead of byte[].
   
   Should we add a new version to the DynamicCommittableSerializer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-10-18 Thread via GitHub


aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2430867538


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId) 
throws IOException {
   }
 
   /**
-   * Write all the completed data files to a newly created manifest file and 
return the manifest's
+   * Write all the completed data files to a newly created manifest files and 
return the manifests'
* avro serialized bytes.
*/
   @VisibleForTesting
-  byte[] writeToManifest(
-  WriteTarget key, Collection writeResults, long 
checkpointId)
+  byte[][] writeToManifests(
+  String tableName, Collection writeResults, long 
checkpointId)

Review Comment:
   > Will take a look soon.
   
   Thanks, I'd appreciate it.
   
   > QQ in the meantime: How do we handle Iceberg version change for long 
running jobs?
   
   Both the current code and the refactor use `ManifestReader`/`Writer` with a 
hard-coded version 2 of the Iceberg format for committable state serialisation 
in a checkpoint. See 
[FlinkManifestUtil](https://github.com/apache/iceberg/blob/main/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java#L44).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-10-18 Thread via GitHub


pvary commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2438463916


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId) 
throws IOException {
   }
 
   /**
-   * Write all the completed data files to a newly created manifest file and 
return the manifest's
+   * Write all the completed data files to a newly created manifest files and 
return the manifests'
* avro serialized bytes.
*/
   @VisibleForTesting
-  byte[] writeToManifest(
-  WriteTarget key, Collection writeResults, long 
checkpointId)
+  byte[][] writeToManifests(
+  String tableName, Collection writeResults, long 
checkpointId)

Review Comment:
   In the past, we have always made sure, that users can upgrade their job to a 
newer Iceberg version without dropping the state. This is important for long 
running jobs, where in-place upgrade is critical.
   
   I think here we should follow the same pattern. If we change how we store 
data in the state, then we need to make sure, that the old state could be read. 
This is done by versioning the serializer. The groundwork is there, and we need 
to use it.
   
   I understand it is extra work to do if we want to change the serialization 
again, but I'm still not convinced that we have a good solution to that problem.
   
   I see 2 options:
   1. Implement a serialization for the multiple manifests now, and remove it 
if we change it again before the next release 
   2. Block this PR until we agree upon the next serialization solution.
   
   I'm leaning towards option 1, because I'm a bit skeptical about other 
serialization methods, and I think we will need longer time to agree on a way 
to move forward.
   
   One argument against it is that the multiple manifest serialization doesn't 
add too much performance gain for us. It "just" helps by simplifying the 
committer code.
   
   Your thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-10-17 Thread via GitHub


pvary commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2426156572


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId) 
throws IOException {
   }
 
   /**
-   * Write all the completed data files to a newly created manifest file and 
return the manifest's
+   * Write all the completed data files to a newly created manifest files and 
return the manifests'
* avro serialized bytes.
*/
   @VisibleForTesting
-  byte[] writeToManifest(
-  WriteTarget key, Collection writeResults, long 
checkpointId)
+  byte[][] writeToManifests(
+  String tableName, Collection writeResults, long 
checkpointId)

Review Comment:
   Let me confirm:
   - A manifest can only contain files for a single partition spec
   - The `RowDelta` operation could commit files for multiple partition spec?
   
   How does the `RowDelta` do this? Does it create multiple manifest files 
behind the scenes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-10-15 Thread via GitHub


aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2431623073


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId) 
throws IOException {
   }
 
   /**
-   * Write all the completed data files to a newly created manifest file and 
return the manifest's
+   * Write all the completed data files to a newly created manifest files and 
return the manifests'
* avro serialized bytes.
*/
   @VisibleForTesting
-  byte[] writeToManifest(
-  WriteTarget key, Collection writeResults, long 
checkpointId)
+  byte[][] writeToManifests(
+  String tableName, Collection writeResults, long 
checkpointId)

Review Comment:
   Why do we need to maintain checkpoint state backwards compatibility, given 
this would go into Iceberg 1.11.0?
   I can see that the `DynamicIcebergSink` is annotated with `@Experimental`, 
which doesn't promise any compatibility guarantees.
   
   I am also planning to change the committable state serialisation a couple 
more times in the following commits to replace manifest serialisation 
completely. If we want to keep backwards compatibility, should we introduce a 
new version in the last commit?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Refactor WriteResult aggregation in DynamicIcebergSink [iceberg]

2025-10-13 Thread via GitHub


aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2427677246


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId) 
throws IOException {
   }
 
   /**
-   * Write all the completed data files to a newly created manifest file and 
return the manifest's
+   * Write all the completed data files to a newly created manifest files and 
return the manifests'
* avro serialized bytes.
*/
   @VisibleForTesting
-  byte[] writeToManifest(
-  WriteTarget key, Collection writeResults, long 
checkpointId)
+  byte[][] writeToManifests(
+  String tableName, Collection writeResults, long 
checkpointId)

Review Comment:
   Yes, a manifest can only contain files for a single partition spec.
   Yes, the `RowDelta` writes multiple manifests behind the scenes. It keeps 
track of `DataFile` and `DeleteFile` by their partition specs. See the 
implementation of 
[MergingSnapshotProducer](https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java#L87):
   ```java
 // update data
 private final Map newDataFilesBySpec = 
Maps.newHashMap();
 private Long newDataFilesDataSequenceNumber;
 private final Map newDeleteFilesBySpec = 
Maps.newHashMap();
   ```
   
   The current implementation of 
[DynamicWriteResultAggregator](https://github.com/apache/iceberg/blob/main/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java#L101)
 implicitly writes a new temporary manifest for each unique `WriteTarget`, 
which creates multiple `DynamicCommittables` per (table, branch, checkpoint) 
triplet (incorrect behaviour).
   
   With this refactor, we write even fewer manifests (only for unique partition 
specs), which makes the implementation explicit:
   1. Create multiple manifests only for different partition specs (similar to 
`RowDelta`)
   2. Create only one `DynamicCommittable` per checkpoint, and use multiple 
manifests for serialisation
   3. Remove all assumptions of multiple commit requests in `DynamicCommitter`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]