Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-04-17 Thread via GitHub


pvary commented on PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#issuecomment-4266963634

   Merged to main.
   Thanks @sqd for the PR and @jordepic, @aiborodin and @mxm for the review.
   
   Please don't forget to backport the PR to the other Flink branches.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-04-17 Thread via GitHub


pvary merged PR #15433:
URL: https://github.com/apache/iceberg/pull/15433


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-04-16 Thread via GitHub


sqd commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r3094007951


##
docs/docs/flink-writes.md:
##
@@ -547,6 +546,28 @@ The Dynamic Iceberg Flink Sink is configured using the 
Builder pattern. Here are
 | `tableCreator(TableCreator creator)` | When DynamicIcebergSink creates new 
Iceberg tables, allows overriding how tables are created - setting custom table 
properties and location based on the table name. |
 | `dropUnusedColumns(boolean enabled)` | When enabled, drops 
all columns from the current table schema which are not contained in the input 
schema (see the caveats above on dropping columns).  |
 
+### Distribution Modes
+
+The `DistributionMode` set on each `DynamicRecord` controls how that record is 
routed from the processor to the writer:
+
+| Mode  | Behavior |
+|---|--|
+| `NONE`| Records are distributed across writer subtasks in a 
round-robin fashion (or by equality fields if set). |
+| `HASH`| Records are distributed by partition key (partitioned 
tables) or equality fields (unpartitioned tables). Ensures that records for the 
same partition are handled by the same writer subtask. |
+| `null` | Forward mode: bypasses distribution entirely and sends records 
directly via a forward edge (see below). |

Review Comment:
   Fixed



##
docs/docs/flink-writes.md:
##
@@ -483,11 +483,10 @@ We need the following information (DynamicRecord) for 
every record:
 | `Schema`   | The schema of the record.   
  |
 | `Spec` | The expected partitioning specification for the record. 
  |
 | `RowData`  | The actual row data to be written.  
  |
-| `DistributionMode` | The distribution mode for writing the record (currently 
supports NONE or HASH).   |
+| `DistributionMode` | The distribution mode for writing the record (NONE, 
HASH or `null`). When `null`, the record won't be shuffled at all. |
 | `Parallelism`  | The maximum number of parallel writers for a given 
table/branch/schema/spec (WriteTarget). |
 | `UpsertMode`   | Overrides this table's write.upsert.enabled (optional). 
  |
 | `EqualityFields`   | The equality fields for the table(optional).
|
-

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-04-16 Thread via GitHub


sqd commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r3094006287


##
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##
@@ -79,13 +81,18 @@ public class DynamicIcebergSink
   private final Configuration flinkConfig;
   private final int cacheMaximumSize;
 
+  // Set by the builder before sinkTo() — forward writer results to union into 
pre-commit topology
+  @Nullable
+  private final transient DataStream> 
forwardWriteResults;

Review Comment:
   You are right. Removed the annotations and null handling from before the 
refactor.



##
docs/docs/flink-writes.md:
##
@@ -547,6 +546,28 @@ The Dynamic Iceberg Flink Sink is configured using the 
Builder pattern. Here are
 | `tableCreator(TableCreator creator)` | When DynamicIcebergSink creates new 
Iceberg tables, allows overriding how tables are created - setting custom table 
properties and location based on the table name. |
 | `dropUnusedColumns(boolean enabled)` | When enabled, drops 
all columns from the current table schema which are not contained in the input 
schema (see the caveats above on dropping columns).  |
 
+### Distribution Modes
+
+The `DistributionMode` set on each `DynamicRecord` controls how that record is 
routed from the processor to the writer:
+
+| Mode  | Behavior |
+|---|--|
+| `NONE`| Records are distributed across writer subtasks in a 
round-robin fashion (or by equality fields if set). |
+| `HASH`| Records are distributed by partition key (partitioned 
tables) or equality fields (unpartitioned tables). Ensures that records for the 
same partition are handled by the same writer subtask. |
+| `null` | Forward mode: bypasses distribution entirely and sends records 
directly via a forward edge (see below). |
+
+ Forward Mode
+
+Using the `DynamicRecord` constructor overload without `distributionMode` 
parameter bypasses distribution entirely. This is designed for high-throughput 
pipelines where every partition already has a large volume of data and the 
serialization and network shuffle cost is prohibitive. Records are sent 
directly from the processor to the writer using a forward edge, enabling Flink 
operator chaining. Table metadata updates are always performed immediately 
inside the processor (regardless of `immediateTableUpdate` setting), because a 
dedicated table-update operator was deliberately omitted to avoid introducing 
extra data shuffles.
+
+Forward and regular records can be mixed in the same pipeline. The processor 
routes records to two separate sink outputs:
+
+- **Shuffle sink**: receives shuffling records. These go through the normal 
distribution topology (hash/round-robin) before reaching the writer.
+- **Forward sink**: receives records without a `distributionMode`. These skip 
distribution entirely and flow via a forward edge from the processor, allowing 
Flink operator chaining. Suited for high-throughput tables where avoiding 
shuffle overhead is critical.
+
+!!! warning

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-04-16 Thread via GitHub


pvary commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r3092814476


##
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##
@@ -79,13 +81,18 @@ public class DynamicIcebergSink
   private final Configuration flinkConfig;
   private final int cacheMaximumSize;
 
+  // Set by the builder before sinkTo() — forward writer results to union into 
pre-commit topology
+  @Nullable
+  private final transient DataStream> 
forwardWriteResults;

Review Comment:
   I don't get this. Isn't this always set?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-04-16 Thread via GitHub


pvary commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r3092704616


##
docs/docs/flink-writes.md:
##
@@ -547,6 +546,28 @@ The Dynamic Iceberg Flink Sink is configured using the 
Builder pattern. Here are
 | `tableCreator(TableCreator creator)` | When DynamicIcebergSink creates new 
Iceberg tables, allows overriding how tables are created - setting custom table 
properties and location based on the table name. |
 | `dropUnusedColumns(boolean enabled)` | When enabled, drops 
all columns from the current table schema which are not contained in the input 
schema (see the caveats above on dropping columns).  |
 
+### Distribution Modes
+
+The `DistributionMode` set on each `DynamicRecord` controls how that record is 
routed from the processor to the writer:
+
+| Mode  | Behavior |
+|---|--|
+| `NONE`| Records are distributed across writer subtasks in a 
round-robin fashion (or by equality fields if set). |
+| `HASH`| Records are distributed by partition key (partitioned 
tables) or equality fields (unpartitioned tables). Ensures that records for the 
same partition are handled by the same writer subtask. |
+| `null` | Forward mode: bypasses distribution entirely and sends records 
directly via a forward edge (see below). |
+
+ Forward Mode
+
+Using the `DynamicRecord` constructor overload without `distributionMode` 
parameter bypasses distribution entirely. This is designed for high-throughput 
pipelines where every partition already has a large volume of data and the 
serialization and network shuffle cost is prohibitive. Records are sent 
directly from the processor to the writer using a forward edge, enabling Flink 
operator chaining. Table metadata updates are always performed immediately 
inside the processor (regardless of `immediateTableUpdate` setting), because a 
dedicated table-update operator was deliberately omitted to avoid introducing 
extra data shuffles.
+
+Forward and regular records can be mixed in the same pipeline. The processor 
routes records to two separate sink outputs:
+
+- **Shuffle sink**: receives shuffling records. These go through the normal 
distribution topology (hash/round-robin) before reaching the writer.
+- **Forward sink**: receives records without a `distributionMode`. These skip 
distribution entirely and flow via a forward edge from the processor, allowing 
Flink operator chaining. Suited for high-throughput tables where avoiding 
shuffle overhead is critical.
+
+!!! warning

Review Comment:
   I think we should also add that direct forwarding could result in unbalanced 
writes and the users are responsible for distributing the data correctly.
   
   Also we should add that  `Parallelism` setting is also ignored



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-04-16 Thread via GitHub


pvary commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r3092696315


##
docs/docs/flink-writes.md:
##
@@ -547,6 +546,28 @@ The Dynamic Iceberg Flink Sink is configured using the 
Builder pattern. Here are
 | `tableCreator(TableCreator creator)` | When DynamicIcebergSink creates new 
Iceberg tables, allows overriding how tables are created - setting custom table 
properties and location based on the table name. |
 | `dropUnusedColumns(boolean enabled)` | When enabled, drops 
all columns from the current table schema which are not contained in the input 
schema (see the caveats above on dropping columns).  |
 
+### Distribution Modes
+
+The `DistributionMode` set on each `DynamicRecord` controls how that record is 
routed from the processor to the writer:
+
+| Mode  | Behavior |
+|---|--|
+| `NONE`| Records are distributed across writer subtasks in a 
round-robin fashion (or by equality fields if set). |
+| `HASH`| Records are distributed by partition key (partitioned 
tables) or equality fields (unpartitioned tables). Ensures that records for the 
same partition are handled by the same writer subtask. |
+| `null` | Forward mode: bypasses distribution entirely and sends records 
directly via a forward edge (see below). |

Review Comment:
   nit: Please format with spaces to match the table format as all of the lines 
are added in this PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-04-16 Thread via GitHub


pvary commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r3092693249


##
docs/docs/flink-writes.md:
##
@@ -483,11 +483,10 @@ We need the following information (DynamicRecord) for 
every record:
 | `Schema`   | The schema of the record.   
  |
 | `Spec` | The expected partitioning specification for the record. 
  |
 | `RowData`  | The actual row data to be written.  
  |
-| `DistributionMode` | The distribution mode for writing the record (currently 
supports NONE or HASH).   |
+| `DistributionMode` | The distribution mode for writing the record (NONE, 
HASH or `null`). When `null`, the record won't be shuffled at all. |
 | `Parallelism`  | The maximum number of parallel writers for a given 
table/branch/schema/spec (WriteTarget). |
 | `UpsertMode`   | Overrides this table's write.upsert.enabled (optional). 
  |
 | `EqualityFields`   | The equality fields for the table(optional).
|
-

Review Comment:
   nit: unnecessary change. Please revert



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-04-15 Thread via GitHub


sqd commented on PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#issuecomment-4253484796

   Moved the changes to Flink 2.1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-04-15 Thread via GitHub


sqd commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r3087700453


##
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java:
##
@@ -238,6 +291,117 @@ void testWrite() throws Exception {
 runTest(rows);
   }
 
+  @Test
+  void testNoShuffleTopology() throws Exception {
+DataStream dataStream =
+env.fromData(
+Collections.emptyList(), TypeInformation.of(new 
TypeHint() {}));
+DynamicIcebergSink.forInput(dataStream)
+.generator(new ForwardGenerator())
+.catalogLoader(CATALOG_EXTENSION.catalogLoader())
+.writeParallelism(2)
+.immediateTableUpdate(false)
+.overwrite(false)
+.append();
+
+boolean generatorAndSinkChained = false;
+for (JobVertex vertex : env.getStreamGraph().getJobGraph().getVertices()) {
+  boolean generatorInThisVertex = false;
+  boolean sinkInThisVertex = false;
+  for (OperatorIDPair operatorID : vertex.getOperatorIDs()) {
+String uid = operatorID.getUserDefinedOperatorUid();
+if (uid == null) {
+  continue;
+}
+
+if (uid.endsWith("-forward-writer")) {
+  sinkInThisVertex = true;
+} else if (uid.endsWith("-generator")) {
+  generatorInThisVertex = true;
+}
+  }
+
+  generatorAndSinkChained = generatorInThisVertex && sinkInThisVertex;
+  if (generatorAndSinkChained) {
+break;
+  }
+}
+
+assertThat(generatorAndSinkChained).isTrue();
+  }
+
+  @Test
+  void testForwardWrite() throws Exception {
+List rows =
+Lists.newArrayList(
+new DynamicIcebergDataImpl(
+SimpleDataUtil.SCHEMA,
+"t1",
+SnapshotRef.MAIN_BRANCH,
+PartitionSpec.unpartitioned()),
+new DynamicIcebergDataImpl(
+SimpleDataUtil.SCHEMA,
+"t1",
+SnapshotRef.MAIN_BRANCH,
+PartitionSpec.unpartitioned()));
+
+DataStream dataStream =
+env.fromData(rows, TypeInformation.of(new TypeHint<>() {}));
+env.setParallelism(1);
+
+DynamicIcebergSink.forInput(dataStream)
+.generator(new ForwardGenerator())
+.catalogLoader(CATALOG_EXTENSION.catalogLoader())
+.writeParallelism(1)
+.immediateTableUpdate(true)
+.append();
+
+env.execute("Test Forward Write");
+
+verifyResults(rows);
+  }
+
+  @Test
+  void testMixedForwardAndShuffleWrite() throws Exception {
+List rows =
+Lists.newArrayList(
+new DynamicIcebergDataImpl(
+SimpleDataUtil.SCHEMA,
+"t1",
+SnapshotRef.MAIN_BRANCH,
+PartitionSpec.unpartitioned()),
+new DynamicIcebergDataImpl(
+SimpleDataUtil.SCHEMA,
+"t1",
+SnapshotRef.MAIN_BRANCH,
+PartitionSpec.unpartitioned()),
+new DynamicIcebergDataImpl(
+SimpleDataUtil.SCHEMA,
+"t1",
+SnapshotRef.MAIN_BRANCH,
+PartitionSpec.unpartitioned()),
+new DynamicIcebergDataImpl(
+SimpleDataUtil.SCHEMA,
+"t1",
+SnapshotRef.MAIN_BRANCH,
+PartitionSpec.unpartitioned()));
+
+DataStream dataStream =
+env.fromData(rows, TypeInformation.of(new TypeHint<>() {}));
+env.setParallelism(1);
+
+DynamicIcebergSink.forInput(dataStream)
+.generator(new MixedGenerator())
+.catalogLoader(CATALOG_EXTENSION.catalogLoader())
+.writeParallelism(1)
+.immediateTableUpdate(true)
+.append();
+
+env.execute("Test Mixed Forward and Shuffle Write");
+
+verifyResults(rows);
+  }

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-04-15 Thread via GitHub


sqd commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r3087694992


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java:
##
@@ -159,33 +174,37 @@ public void collect(DynamicRecord data) {
   }
 } else {
   emit(
-  collector,
   data,
   foundSchema.resolvedTableSchema(),
   foundSchema.recordConverter(),
-  foundSpec);
+  foundSpec,
+  isForward);
 }
   }
 
   private void emit(
-  Collector out,
   DynamicRecord data,
   Schema schema,
   DataConverter recordConverter,
-  PartitionSpec spec) {
+  PartitionSpec spec,
+  boolean forward) {
 RowData rowData = (RowData) recordConverter.convert(data.rowData());
-int writerKey = hashKeyGenerator.generateKey(data, schema, spec, rowData);
-String tableName = data.tableIdentifier().toString();
-out.collect(
+int writerKey = forward ? 0 : hashKeyGenerator.generateKey(data, schema, 
spec, rowData);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-04-15 Thread via GitHub


sqd commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r3087697553


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##
@@ -357,43 +416,79 @@ private String operatorName(String suffix) {
   return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
 }
 
-private DynamicIcebergSink build() {
+private DynamicIcebergSink build(
+SingleOutputStreamOperator converted,
+DynamicRecordInternalType sideOutputType) {
 
   Preconditions.checkArgument(
   generator != null, "Please use withGenerator() to convert the input 
DataStream.");
   Preconditions.checkNotNull(catalogLoader, "Catalog loader shouldn't be 
null");
 
-  uidPrefix = Optional.ofNullable(uidPrefix).orElse("");
-
   Configuration flinkConfig =
   readableConfig instanceof Configuration
   ? (Configuration) readableConfig
   : Configuration.fromMap(readableConfig.toMap());
 
-  return instantiateSink(writeOptions, flinkConfig);
+  // Forward writer: chained with generator via forward edge, no data 
shuffle
+  ForwardWriterSink forwardWriterSink =
+  new ForwardWriterSink(catalogLoader, writeOptions, flinkConfig, 
cacheMaximumSize);
+  TypeInformation> 
writeResultTypeInfo =
+  CommittableMessageTypeInfo.of(DynamicWriteResultSerializer::new);
+
+  DataStream> forwardWriteResults =
+  converted
+  .getSideOutput(
+  new 
OutputTag<>(DynamicRecordProcessor.DYNAMIC_FORWARD_STREAM, sideOutputType))
+  .transform(
+  operatorName("Forward-Writer"),
+  writeResultTypeInfo,
+  new SinkWriterOperatorFactory<>(forwardWriterSink))
+  .uid(prefixIfNotNull(uidPrefix, "-forward-writer"));

Review Comment:
   Now using `containsOnly`, and added the uid for the forward writer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-04-15 Thread via GitHub


sqd commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r3087699157


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##
@@ -79,13 +80,17 @@ public class DynamicIcebergSink
   private final Configuration flinkConfig;
   private final int cacheMaximumSize;
 
+  // Set by the builder before sinkTo() — forward writer results to union into 
pre-commit topology
+  private final transient DataStream> 
forwardWriteResults;

Review Comment:
   Done



##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##
@@ -79,13 +80,17 @@ public class DynamicIcebergSink
   private final Configuration flinkConfig;
   private final int cacheMaximumSize;
 
+  // Set by the builder before sinkTo() — forward writer results to union into 
pre-commit topology
+  private final transient DataStream> 
forwardWriteResults;
+
   DynamicIcebergSink(
   CatalogLoader catalogLoader,
   Map snapshotProperties,
   String uidPrefix,
   Map writeProperties,
   Configuration flinkConfig,
-  int cacheMaximumSize) {
+  int cacheMaximumSize,
+  DataStream> forwardWriteResults) {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-04-15 Thread via GitHub


mxm commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r3085320269


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java:
##
@@ -159,33 +174,37 @@ public void collect(DynamicRecord data) {
   }
 } else {
   emit(
-  collector,
   data,
   foundSchema.resolvedTableSchema(),
   foundSchema.recordConverter(),
-  foundSpec);
+  foundSpec,
+  isForward);
 }
   }
 
   private void emit(
-  Collector out,
   DynamicRecord data,
   Schema schema,
   DataConverter recordConverter,
-  PartitionSpec spec) {
+  PartitionSpec spec,
+  boolean forward) {
 RowData rowData = (RowData) recordConverter.convert(data.rowData());
-int writerKey = hashKeyGenerator.generateKey(data, schema, spec, rowData);
-String tableName = data.tableIdentifier().toString();
-out.collect(
+int writerKey = forward ? 0 : hashKeyGenerator.generateKey(data, schema, 
spec, rowData);

Review Comment:
   ```suggestion
   int writerKey = forward ? -1 : hashKeyGenerator.generateKey(data, 
schema, spec, rowData);
   ```
   
   Use `-1` to indicate non-applicable? Probably worth to add a comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-04-15 Thread via GitHub


mxm commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r3085223042


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##
@@ -79,13 +80,17 @@ public class DynamicIcebergSink
   private final Configuration flinkConfig;
   private final int cacheMaximumSize;
 
+  // Set by the builder before sinkTo() — forward writer results to union into 
pre-commit topology
+  private final transient DataStream> 
forwardWriteResults;
+
   DynamicIcebergSink(
   CatalogLoader catalogLoader,
   Map snapshotProperties,
   String uidPrefix,
   Map writeProperties,
   Configuration flinkConfig,
-  int cacheMaximumSize) {
+  int cacheMaximumSize,
+  DataStream> forwardWriteResults) {

Review Comment:
   ```suggestion
 @Nullable DataStream> 
forwardWriteResults) {
   ```



##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##
@@ -357,43 +416,79 @@ private String operatorName(String suffix) {
   return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
 }
 
-private DynamicIcebergSink build() {
+private DynamicIcebergSink build(
+SingleOutputStreamOperator converted,
+DynamicRecordInternalType sideOutputType) {
 
   Preconditions.checkArgument(
   generator != null, "Please use withGenerator() to convert the input 
DataStream.");
   Preconditions.checkNotNull(catalogLoader, "Catalog loader shouldn't be 
null");
 
-  uidPrefix = Optional.ofNullable(uidPrefix).orElse("");
-
   Configuration flinkConfig =
   readableConfig instanceof Configuration
   ? (Configuration) readableConfig
   : Configuration.fromMap(readableConfig.toMap());
 
-  return instantiateSink(writeOptions, flinkConfig);
+  // Forward writer: chained with generator via forward edge, no data 
shuffle
+  ForwardWriterSink forwardWriterSink =
+  new ForwardWriterSink(catalogLoader, writeOptions, flinkConfig, 
cacheMaximumSize);
+  TypeInformation> 
writeResultTypeInfo =
+  CommittableMessageTypeInfo.of(DynamicWriteResultSerializer::new);
+
+  DataStream> forwardWriteResults =
+  converted
+  .getSideOutput(
+  new 
OutputTag<>(DynamicRecordProcessor.DYNAMIC_FORWARD_STREAM, sideOutputType))
+  .transform(
+  operatorName("Forward-Writer"),
+  writeResultTypeInfo,
+  new SinkWriterOperatorFactory<>(forwardWriterSink))
+  .uid(prefixIfNotNull(uidPrefix, "-forward-writer"));

Review Comment:
   Could you please check the `testOperatorUidsFormat` test. The new UUID of 
the "Forward-Writer" should be added to that test. The test currently uses 
`contains()` logic and needs to be migrated to use `containsOnly` to ensure the 
test fails when we add new operators.



##
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java:
##
@@ -238,6 +291,117 @@ void testWrite() throws Exception {
 runTest(rows);
   }
 
+  @Test
+  void testNoShuffleTopology() throws Exception {
+DataStream dataStream =
+env.fromData(
+Collections.emptyList(), TypeInformation.of(new 
TypeHint() {}));
+DynamicIcebergSink.forInput(dataStream)
+.generator(new ForwardGenerator())
+.catalogLoader(CATALOG_EXTENSION.catalogLoader())
+.writeParallelism(2)
+.immediateTableUpdate(false)
+.overwrite(false)
+.append();
+
+boolean generatorAndSinkChained = false;
+for (JobVertex vertex : env.getStreamGraph().getJobGraph().getVertices()) {
+  boolean generatorInThisVertex = false;
+  boolean sinkInThisVertex = false;
+  for (OperatorIDPair operatorID : vertex.getOperatorIDs()) {
+String uid = operatorID.getUserDefinedOperatorUid();
+if (uid == null) {
+  continue;
+}
+
+if (uid.endsWith("-forward-writer")) {
+  sinkInThisVertex = true;
+} else if (uid.endsWith("-generator")) {
+  generatorInThisVertex = true;
+}
+  }
+
+  generatorAndSinkChained = generatorInThisVertex && sinkInThisVertex;
+  if (generatorAndSinkChained) {
+break;
+  }
+}
+
+assertThat(generatorAndSinkChained).isTrue();
+  }

Review Comment:
   Makes me think whether we should break apart this class into an integration 
test class and a unit test class. We can do that in a follow-up.



##
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java:
##
@@ -238,6 +291,117 @@ void testWrite() throws Exception {
 runTest(rows);
   }
 
+  @Test
+  void testNoShuffleTopology() throws Exception {
+DataStream dataStream =
+env.fromData(
+Collections.emptyList(), TypeInformation.of(new 
T

Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-03-31 Thread via GitHub


sqd commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r3017993183


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##
@@ -411,12 +488,32 @@ public DataStreamSink append() {
   .name(operatorName("generator"))
   .returns(type);
 
-  DataStreamSink rowDataDataStreamSink =
+  // Forward writer: chained with generator via forward edge, no data 
shuffle
+  ForwardWriterSink forwardWriterSink =
+  new ForwardWriterSink(
+  sink.catalogLoader, sink.writeProperties, sink.flinkConfig, 
sink.cacheMaximumSize);
+  TypeInformation> 
writeResultTypeInfo =
+  CommittableMessageTypeInfo.of(sink::getWriteResultSerializer);
+
+  DataStream> forwardWritten =
+  converted
+  .getSideOutput(
+  new 
OutputTag<>(DynamicRecordProcessor.DYNAMIC_FORWARD_STREAM, sideOutputType))
+  .transform(
+  operatorName("Forward-Writer"),
+  writeResultTypeInfo,
+  new SinkWriterOperatorFactory<>(forwardWriterSink))
+  .uid(prefixIfNotNull(uidPrefix, "-forward-writer"));
+
+  // Inject forward write results into sink — they'll be unioned in 
addPreCommitTopology
+  sink.setForwardWriteResults(forwardWritten);

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-03-31 Thread via GitHub


sqd commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r3017999040


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##
@@ -167,6 +180,55 @@ public SimpleVersionedSerializer 
getWriteResultSerializer()
 return new DynamicWriteResultSerializer();
   }
 
+  /**
+   * A lightweight Sink used with {@link SinkWriterOperatorFactory} for the 
forward write path.
+   * Implements {@link SupportsCommitter} so that {@code SinkWriterOperator} 
emits committables
+   * downstream. The committer is never called — committing is handled by the 
main sink.
+   */
+  @VisibleForTesting
+  static class ForwardWriterSink
+  implements Sink, 
SupportsCommitter {
+
+private final CatalogLoader catalogLoader;
+private final Map writeProperties;
+private final Configuration flinkConfig;
+private final int cacheMaximumSize;
+
+ForwardWriterSink(
+CatalogLoader catalogLoader,
+Map writeProperties,
+Configuration flinkConfig,
+int cacheMaximumSize) {
+  this.catalogLoader = catalogLoader;
+  this.writeProperties = writeProperties;
+  this.flinkConfig = flinkConfig;
+  this.cacheMaximumSize = cacheMaximumSize;
+}
+
+@Override
+public SinkWriter createWriter(WriterInitContext 
context) {
+  return new DynamicWriter(
+  catalogLoader.loadCatalog(),
+  writeProperties,
+  flinkConfig,
+  cacheMaximumSize,
+  new DynamicWriterMetrics(context.metricGroup()),
+  context.getTaskInfo().getIndexOfThisSubtask(),
+  context.getTaskInfo().getAttemptNumber());
+}
+
+@Override
+public Committer createCommitter(CommitterInitContext 
context) {
+  throw new UnsupportedOperationException(
+  "WriterSink is used only for writing; committing is handled by the 
main sink");
+}

Review Comment:
   [This 
logic](https://github.com/apache/flink/blob/release-2.0/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java#L123)
 requires us to tag the the sink with `SupportsCommitter`, otherwise [Flink 
won't make the sink operator 
emit](https://github.com/apache/flink/blob/release-2.0/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java#L215-L224)
 any commitables to the downstream at all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-03-31 Thread via GitHub


sqd commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r3017991856


##
docs/docs/flink-writes.md:
##
@@ -483,11 +483,10 @@ We need the following information (DynamicRecord) for 
every record:
 | `Schema`   | The schema of the record.   
  |
 | `Spec` | The expected partitioning specification for the record. 
  |
 | `RowData`  | The actual row data to be written.  
  |
-| `DistributionMode` | The distribution mode for writing the record (currently 
supports NONE or HASH).   |
+| `DistributionMode` | The distribution mode for writing the record (NONE, 
HASH or optional). When unspecified, the record won't be shuffled at all. |

Review Comment:
   Fixed



##
docs/docs/flink-writes.md:
##
@@ -547,6 +546,28 @@ The Dynamic Iceberg Flink Sink is configured using the 
Builder pattern. Here are
 | `tableCreator(TableCreator creator)` | When DynamicIcebergSink creates new 
Iceberg tables, allows overriding how tables are created - setting custom table 
properties and location based on the table name. |
 | `dropUnusedColumns(boolean enabled)` | When enabled, drops 
all columns from the current table schema which are not contained in the input 
schema (see the caveats above on dropping columns).  |
 
+### Distribution Modes
+
+The `DistributionMode` set on each `DynamicRecord` controls how that record is 
routed from the processor to the writer:
+
+| Mode  | Behavior |
+|---|--|
+| `NONE`| Records are distributed across writer subtasks in a 
round-robin fashion (or by equality fields if set). |
+| `HASH`| Records are distributed by partition key (partitioned 
tables) or equality fields (unpartitioned tables). Ensures that records for the 
same partition are handled by the same writer subtask. |
+| (unspecified) | Forward mode: bypasses distribution entirely and sends 
records directly via a forward edge (see below). |

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-03-31 Thread via GitHub


mxm commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r3016175297


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##
@@ -167,6 +180,55 @@ public SimpleVersionedSerializer 
getWriteResultSerializer()
 return new DynamicWriteResultSerializer();
   }
 
+  /**
+   * A lightweight Sink used with {@link SinkWriterOperatorFactory} for the 
forward write path.
+   * Implements {@link SupportsCommitter} so that {@code SinkWriterOperator} 
emits committables
+   * downstream. The committer is never called — committing is handled by the 
main sink.
+   */
+  @VisibleForTesting
+  static class ForwardWriterSink
+  implements Sink, 
SupportsCommitter {
+
+private final CatalogLoader catalogLoader;
+private final Map writeProperties;
+private final Configuration flinkConfig;
+private final int cacheMaximumSize;
+
+ForwardWriterSink(
+CatalogLoader catalogLoader,
+Map writeProperties,
+Configuration flinkConfig,
+int cacheMaximumSize) {
+  this.catalogLoader = catalogLoader;
+  this.writeProperties = writeProperties;
+  this.flinkConfig = flinkConfig;
+  this.cacheMaximumSize = cacheMaximumSize;
+}
+
+@Override
+public SinkWriter createWriter(WriterInitContext 
context) {
+  return new DynamicWriter(
+  catalogLoader.loadCatalog(),
+  writeProperties,
+  flinkConfig,
+  cacheMaximumSize,
+  new DynamicWriterMetrics(context.metricGroup()),
+  context.getTaskInfo().getIndexOfThisSubtask(),
+  context.getTaskInfo().getAttemptNumber());
+}
+
+@Override
+public Committer createCommitter(CommitterInitContext 
context) {
+  throw new UnsupportedOperationException(
+  "WriterSink is used only for writing; committing is handled by the 
main sink");
+}

Review Comment:
   I see now that we need `Sink` when we use `SinkWriterOperatorFactory`, but 
we can remove the `SupportsCommitter`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-03-30 Thread via GitHub


jordepic commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r3010496222


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##
@@ -167,6 +180,55 @@ public SimpleVersionedSerializer 
getWriteResultSerializer()
 return new DynamicWriteResultSerializer();
   }
 
+  /**
+   * A lightweight Sink used with {@link SinkWriterOperatorFactory} for the 
forward write path.
+   * Implements {@link SupportsCommitter} so that {@code SinkWriterOperator} 
emits committables
+   * downstream. The committer is never called — committing is handled by the 
main sink.
+   */
+  @VisibleForTesting
+  static class ForwardWriterSink
+  implements Sink, 
SupportsCommitter {
+
+private final CatalogLoader catalogLoader;
+private final Map writeProperties;
+private final Configuration flinkConfig;
+private final int cacheMaximumSize;
+
+ForwardWriterSink(
+CatalogLoader catalogLoader,
+Map writeProperties,
+Configuration flinkConfig,
+int cacheMaximumSize) {
+  this.catalogLoader = catalogLoader;
+  this.writeProperties = writeProperties;
+  this.flinkConfig = flinkConfig;
+  this.cacheMaximumSize = cacheMaximumSize;
+}
+
+@Override
+public SinkWriter createWriter(WriterInitContext 
context) {
+  return new DynamicWriter(
+  catalogLoader.loadCatalog(),
+  writeProperties,
+  flinkConfig,
+  cacheMaximumSize,
+  new DynamicWriterMetrics(context.metricGroup()),
+  context.getTaskInfo().getIndexOfThisSubtask(),
+  context.getTaskInfo().getAttemptNumber());
+}
+
+@Override
+public Committer createCommitter(CommitterInitContext 
context) {
+  throw new UnsupportedOperationException(
+  "WriterSink is used only for writing; committing is handled by the 
main sink");
+}

Review Comment:
   The SinkWriterOperator is what prepares the snapshots for us pre barrier, so 
I had looked to reuse code where possible



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-03-30 Thread via GitHub


jordepic commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r3010480422


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##
@@ -167,6 +180,55 @@ public SimpleVersionedSerializer 
getWriteResultSerializer()
 return new DynamicWriteResultSerializer();
   }
 
+  /**
+   * A lightweight Sink used with {@link SinkWriterOperatorFactory} for the 
forward write path.
+   * Implements {@link SupportsCommitter} so that {@code SinkWriterOperator} 
emits committables
+   * downstream. The committer is never called — committing is handled by the 
main sink.
+   */
+  @VisibleForTesting
+  static class ForwardWriterSink
+  implements Sink, 
SupportsCommitter {
+
+private final CatalogLoader catalogLoader;
+private final Map writeProperties;
+private final Configuration flinkConfig;
+private final int cacheMaximumSize;
+
+ForwardWriterSink(
+CatalogLoader catalogLoader,
+Map writeProperties,
+Configuration flinkConfig,
+int cacheMaximumSize) {
+  this.catalogLoader = catalogLoader;
+  this.writeProperties = writeProperties;
+  this.flinkConfig = flinkConfig;
+  this.cacheMaximumSize = cacheMaximumSize;
+}
+
+@Override
+public SinkWriter createWriter(WriterInitContext 
context) {
+  return new DynamicWriter(
+  catalogLoader.loadCatalog(),
+  writeProperties,
+  flinkConfig,
+  cacheMaximumSize,
+  new DynamicWriterMetrics(context.metricGroup()),
+  context.getTaskInfo().getIndexOfThisSubtask(),
+  context.getTaskInfo().getAttemptNumber());
+}
+
+@Override
+public Committer createCommitter(CommitterInitContext 
context) {
+  throw new UnsupportedOperationException(
+  "WriterSink is used only for writing; committing is handled by the 
main sink");
+}

Review Comment:
   I had initially done so so that we could wrap it with a 
SinkWriterOperatorFactory to ensure that the checkpointing worked as 
anticipated and stayed aligned with the other DynamicWriter.  Do you think this 
is unnecessary? I do agree that overriding SupportsCommitter is probably 
unnecessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-03-30 Thread via GitHub


mxm commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r3009894410


##
docs/docs/flink-writes.md:
##
@@ -483,11 +483,10 @@ We need the following information (DynamicRecord) for 
every record:
 | `Schema`   | The schema of the record.   
  |
 | `Spec` | The expected partitioning specification for the record. 
  |
 | `RowData`  | The actual row data to be written.  
  |
-| `DistributionMode` | The distribution mode for writing the record (currently 
supports NONE or HASH).   |
+| `DistributionMode` | The distribution mode for writing the record (NONE, 
HASH or optional). When unspecified, the record won't be shuffled at all. |

Review Comment:
   I think it is better to be explicit, despite the new constructor which 
somewhat hides this implementation detail. It's not completely hidden though 
because users can retrieve the value via `DynmicRecord#distributionMode()`.



##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##
@@ -411,12 +488,32 @@ public DataStreamSink append() {
   .name(operatorName("generator"))
   .returns(type);
 
-  DataStreamSink rowDataDataStreamSink =
+  // Forward writer: chained with generator via forward edge, no data 
shuffle
+  ForwardWriterSink forwardWriterSink =
+  new ForwardWriterSink(
+  sink.catalogLoader, sink.writeProperties, sink.flinkConfig, 
sink.cacheMaximumSize);
+  TypeInformation> 
writeResultTypeInfo =
+  CommittableMessageTypeInfo.of(sink::getWriteResultSerializer);
+
+  DataStream> forwardWritten =
+  converted
+  .getSideOutput(
+  new 
OutputTag<>(DynamicRecordProcessor.DYNAMIC_FORWARD_STREAM, sideOutputType))
+  .transform(
+  operatorName("Forward-Writer"),
+  writeResultTypeInfo,
+  new SinkWriterOperatorFactory<>(forwardWriterSink))
+  .uid(prefixIfNotNull(uidPrefix, "-forward-writer"));
+
+  // Inject forward write results into sink — they'll be unioned in 
addPreCommitTopology
+  sink.setForwardWriteResults(forwardWritten);

Review Comment:
   Why are we not creating the `forwardWritten` DynamicSink in the `build()` 
method. We wouldn't have to set it here then and could pass it directly to 
`DynamicIcebergSink`.



##
docs/docs/flink-writes.md:
##
@@ -547,6 +546,28 @@ The Dynamic Iceberg Flink Sink is configured using the 
Builder pattern. Here are
 | `tableCreator(TableCreator creator)` | When DynamicIcebergSink creates new 
Iceberg tables, allows overriding how tables are created - setting custom table 
properties and location based on the table name. |
 | `dropUnusedColumns(boolean enabled)` | When enabled, drops 
all columns from the current table schema which are not contained in the input 
schema (see the caveats above on dropping columns).  |
 
+### Distribution Modes
+
+The `DistributionMode` set on each `DynamicRecord` controls how that record is 
routed from the processor to the writer:
+
+| Mode  | Behavior |
+|---|--|
+| `NONE`| Records are distributed across writer subtasks in a 
round-robin fashion (or by equality fields if set). |
+| `HASH`| Records are distributed by partition key (partitioned 
tables) or equality fields (unpartitioned tables). Ensures that records for the 
same partition are handled by the same writer subtask. |
+| (unspecified) | Forward mode: bypasses distribution entirely and sends 
records directly via a forward edge (see below). |

Review Comment:
   ```suggestion
   | `null` | Forward mode: bypasses distribution entirely and sends records 
directly via a forward edge (see below). |
   ```



##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##
@@ -167,6 +180,55 @@ public SimpleVersionedSerializer 
getWriteResultSerializer()
 return new DynamicWriteResultSerializer();
   }
 
+  /**
+   * A lightweight Sink used with {@link SinkWriterOperatorFactory} for the 
forward write path.
+   * Implements {@link SupportsCommitter} so that {@code SinkWriterOperator} 
emits committables
+   * downstream. The committer is never called — committing is handled by the 
main sink.
+   */
+  @VisibleForTesting
+  static class ForwardWriterSink
+  implements Sink, 
SupportsCommitter {
+
+private final CatalogLoader catalogLoader;
+private final Map writeProperties;
+private final Configuration flinkConfig;
+private final int cacheMaximumSize;
+
+ForwardWriterSink(
+CatalogLoader catalogLoader,
+Map writeProperties,
+Configuration flinkConfig,
+ 

Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-03-26 Thread via GitHub


sqd commented on PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#issuecomment-4136901873

   https://github.com/user-attachments/assets/0c821820-875e-40d2-b82b-c583bff08c92";
 />
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-03-26 Thread via GitHub


sqd commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r2996543644


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##
@@ -430,16 +438,29 @@ public DataStreamSink append() {
   .uid(prefixIfNotNull(uidPrefix, "-updater"))
   .name(operatorName("Updater"))
   .returns(type)
-  .union(converted)
-  .sinkTo(sink)
-  .uid(prefixIfNotNull(uidPrefix, "-sink"));
+  .union(converted);
+
+  DataStreamSink shuffleSinkResult =
+  shuffleInput.sinkTo(shuffleSink).uid(prefixIfNotNull(uidPrefix, 
"-shuffle-sink"));
 
   FlinkWriteConf flinkWriteConf = new FlinkWriteConf(writeOptions, 
readableConfig);
   if (flinkWriteConf.writeParallelism() != null) {
-
rowDataDataStreamSink.setParallelism(flinkWriteConf.writeParallelism());
+shuffleSinkResult.setParallelism(flinkWriteConf.writeParallelism());
   }
 
-  return rowDataDataStreamSink;
+  // Forward sink: handles forward records (distributionMode == null, 
chainable)
+  DynamicIcebergSink forwardSink = build(true);
+  OutputTag forwardTag =
+  new OutputTag<>(
+  DynamicRecordProcessor.DYNAMIC_FORWARD_STREAM,
+  new DynamicRecordInternalType(catalogLoader, true, 
cacheMaximumSize));
+
+  converted
+  .getSideOutput(forwardTag)
+  .sinkTo(forwardSink)
+  .uid(prefixIfNotNull(uidPrefix, "-forward-sink"));
+
+  return shuffleSinkResult;

Review Comment:
   As discussed, I changed this so we can union the Commitable and commit in 
the same committer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-03-21 Thread via GitHub


sqd commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r2969802304


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java:
##
@@ -59,7 +74,7 @@ public DynamicRecord(
   Schema schema,
   RowData rowData,
   PartitionSpec partitionSpec,
-  DistributionMode distributionMode,
+  @Nullable DistributionMode distributionMode,
   int writeParallelism) {

Review Comment:
   Good point. I changed this and reworded the other constructor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-03-20 Thread via GitHub


mxm commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r2964644977


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java:
##
@@ -34,11 +34,25 @@ public class DynamicRecord {
   private Schema schema;
   private RowData rowData;
   private PartitionSpec partitionSpec;
-  private DistributionMode distributionMode;
+  @Nullable private DistributionMode distributionMode;
   private int writeParallelism;
   private boolean upsertMode;
   @Nullable private Set equalityFields;
 
+  /**
+   * Constructs a new DynamicRecord with {@code null} distributionMode, 
indicating forward (no
+   * shuffle) writes.
+   */
+  public DynamicRecord(
+  TableIdentifier tableIdentifier,
+  String branch,
+  Schema schema,
+  RowData rowData,
+  PartitionSpec partitionSpec,
+  int writeParallelism) {
+this(tableIdentifier, branch, schema, rowData, partitionSpec, null, 
writeParallelism);

Review Comment:
   ```suggestion
 public DynamicRecord(
 TableIdentifier tableIdentifier,
 String branch,
 Schema schema,
 RowData rowData,
 PartitionSpec partitionSpec) {
   this(tableIdentifier, branch, schema, rowData, partitionSpec, null, -1);
   ```
   
   Write parallelism doesn't make sense with the direct write path.



##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java:
##
@@ -59,7 +74,7 @@ public DynamicRecord(
   Schema schema,
   RowData rowData,
   PartitionSpec partitionSpec,
-  DistributionMode distributionMode,
+  @Nullable DistributionMode distributionMode,
   int writeParallelism) {

Review Comment:
   ```suggestion
 DistributionMode distributionMode,
 int writeParallelism) {
   ```
   
   We can add the Nullable to the field itself, but this constructor is for 
when we actually have a DistributionMode. IMHO the nullability of 
DistributionMode should be an implementation detail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-03-20 Thread via GitHub


pvary commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r2964463353


##
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java:
##
@@ -1345,6 +1465,100 @@ public Committer 
createCommitter(CommitterInitContext contex
 }
   }
 
+  static class ShuffleOrForwardTrackingBuilder extends 
DynamicIcebergSink.Builder {
+static List forwardRecords = new 
CopyOnWriteArrayList<>();
+static List shuffleRecords = new 
CopyOnWriteArrayList<>();
+
+static void reset() {
+  forwardRecords.clear();
+  shuffleRecords.clear();
+}
+
+@Override
+DynamicIcebergSink instantiateSink(
+Map writeProperties, Configuration flinkConfig, 
boolean forwardOnly) {
+  return new TrackingSink(
+  CATALOG_EXTENSION.catalogLoader(),
+  Collections.emptyMap(),
+  "uidPrefix",
+  writeProperties,
+  flinkConfig,
+  100,
+  forwardOnly);
+}
+
+private static class TrackingSink extends DynamicIcebergSink {
+  private final boolean forwardOnly;
+
+  TrackingSink(
+  CatalogLoader catalogLoader,
+  Map snapshotProperties,
+  String uidPrefix,
+  Map writeProperties,
+  Configuration flinkConfig,
+  int cacheMaximumSize,
+  boolean forwardOnly) {
+super(
+catalogLoader,
+snapshotProperties,
+uidPrefix,
+writeProperties,
+flinkConfig,
+cacheMaximumSize,
+forwardOnly);
+this.forwardOnly = forwardOnly;
+  }
+
+  @Override
+  public SinkWriter createWriter(WriterInitContext 
context) {
+return new TrackingWriter(
+(CommittingSinkWriter)
+super.createWriter(context),
+forwardOnly);
+  }
+}
+
+private static class TrackingWriter
+implements CommittingSinkWriter {
+  private final CommittingSinkWriter delegate;
+  private final boolean forwardOnly;
+
+  TrackingWriter(
+  CommittingSinkWriter 
delegate,
+  boolean forwardOnly) {
+this.delegate = delegate;
+this.forwardOnly = forwardOnly;
+  }
+
+  @Override
+  public void write(DynamicRecordInternal element, Context context)
+  throws IOException, InterruptedException {
+if (forwardOnly) {
+  forwardRecords.add(element);
+} else {
+  shuffleRecords.add(element);
+}
+delegate.write(element, context);

Review Comment:
   nit: newline



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-03-20 Thread via GitHub


pvary commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r2964456455


##
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java:
##
@@ -236,6 +250,109 @@ void testWrite() throws Exception {
 runTest(rows);
   }
 
+  @Test
+  void testNoShuffleTopology() throws Exception {
+DataStream dataStream =
+env.fromData(
+Collections.emptyList(), TypeInformation.of(new 
TypeHint() {}));
+DynamicIcebergSink.forInput(dataStream)
+.generator(new Generator())
+.catalogLoader(CATALOG_EXTENSION.catalogLoader())
+.writeParallelism(2)
+.immediateTableUpdate(false)
+.overwrite(false)
+.append();
+
+boolean generatorAndSinkChained = false;
+for (JobVertex vertex : env.getStreamGraph().getJobGraph().getVertices()) {
+  boolean generatorInThisVertex = false;
+  boolean sinkInThisVertex = false;
+  for (OperatorIDPair operatorID : vertex.getOperatorIDs()) {
+String uid = operatorID.getUserDefinedOperatorUid();
+if (uid == null) {
+  continue;
+}
+if (uid.endsWith("-forward-sink")) {

Review Comment:
   nit: newline



##
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java:
##
@@ -236,6 +250,109 @@ void testWrite() throws Exception {
 runTest(rows);
   }
 
+  @Test
+  void testNoShuffleTopology() throws Exception {
+DataStream dataStream =
+env.fromData(
+Collections.emptyList(), TypeInformation.of(new 
TypeHint() {}));
+DynamicIcebergSink.forInput(dataStream)
+.generator(new Generator())
+.catalogLoader(CATALOG_EXTENSION.catalogLoader())
+.writeParallelism(2)
+.immediateTableUpdate(false)
+.overwrite(false)
+.append();
+
+boolean generatorAndSinkChained = false;
+for (JobVertex vertex : env.getStreamGraph().getJobGraph().getVertices()) {
+  boolean generatorInThisVertex = false;
+  boolean sinkInThisVertex = false;
+  for (OperatorIDPair operatorID : vertex.getOperatorIDs()) {
+String uid = operatorID.getUserDefinedOperatorUid();
+if (uid == null) {
+  continue;
+}
+if (uid.endsWith("-forward-sink")) {
+  sinkInThisVertex = true;
+} else if (uid.endsWith("-generator")) {
+  generatorInThisVertex = true;
+}
+  }
+  generatorAndSinkChained = generatorInThisVertex && sinkInThisVertex;

Review Comment:
   nit: newline



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-03-20 Thread via GitHub


pvary commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r296746


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##
@@ -430,16 +438,29 @@ public DataStreamSink append() {
   .uid(prefixIfNotNull(uidPrefix, "-updater"))
   .name(operatorName("Updater"))
   .returns(type)
-  .union(converted)
-  .sinkTo(sink)
-  .uid(prefixIfNotNull(uidPrefix, "-sink"));
+  .union(converted);
+
+  DataStreamSink shuffleSinkResult =
+  shuffleInput.sinkTo(shuffleSink).uid(prefixIfNotNull(uidPrefix, 
"-shuffle-sink"));
 
   FlinkWriteConf flinkWriteConf = new FlinkWriteConf(writeOptions, 
readableConfig);
   if (flinkWriteConf.writeParallelism() != null) {
-
rowDataDataStreamSink.setParallelism(flinkWriteConf.writeParallelism());
+shuffleSinkResult.setParallelism(flinkWriteConf.writeParallelism());
   }
 
-  return rowDataDataStreamSink;
+  // Forward sink: handles forward records (distributionMode == null, 
chainable)
+  DynamicIcebergSink forwardSink = build(true);
+  OutputTag forwardTag =
+  new OutputTag<>(
+  DynamicRecordProcessor.DYNAMIC_FORWARD_STREAM,
+  new DynamicRecordInternalType(catalogLoader, true, 
cacheMaximumSize));
+
+  converted
+  .getSideOutput(forwardTag)
+  .sinkTo(forwardSink)
+  .uid(prefixIfNotNull(uidPrefix, "-forward-sink"));
+
+  return shuffleSinkResult;

Review Comment:
   Do I understand correctly that we create 2 instances of DynamicSinks here?
   So basically the 2 sinks doesn't share the 
writers/pre-commit-topology/committers etc?
   Also, do I understand correctly that the resulting stream only contains the 
output of the shuffling sink?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-03-20 Thread via GitHub


pvary commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r2964420074


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##
@@ -135,6 +138,9 @@ public void addPostCommitTopology(
   @Override
   public DataStream addPreWriteTopology(
   DataStream inputDataStream) {
+if (forwardOnly) {
+  return inputDataStream;
+}

Review Comment:
   nit newline after the block



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-03-18 Thread via GitHub


sqd commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r2954403661


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java:
##
@@ -134,6 +135,14 @@ public void setUpsertMode(boolean upsertMode) {
 this.upsertMode = upsertMode;
   }
 
+  public boolean noShuffle() {
+return noShuffle;
+  }
+
+  public void setNoShuffle(boolean noShuffle) {
+this.noShuffle = noShuffle;
+  }
+

Review Comment:
   Added a new constructor and removed the flag.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-03-09 Thread via GitHub


mxm commented on PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#issuecomment-4024562413

   +1 on breaking up the change, but we need both before the release. 
Otherwise, users would lose some options.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-03-09 Thread via GitHub


pvary commented on PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#issuecomment-4022307835

   We will have a release soon. Losing the `ROUND_ROBIN` functionality seems 
like an issue to me.
   We should find a good solution before jumping the gun and make some random 
changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-03-08 Thread via GitHub


aiborodin commented on PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#issuecomment-4020257861

   How about we split this change into two distinct PRs:
1. PR 1: Fix the existing `DistributionMode.NONE` to pass records through 
to the chained writer.
2. PR 2: Add `DistributionMode.ROUND_ROBIN` with all required changes and 
discussions for Spark, etc.
   
   It would allow us to separate the concerns and unblock the original change 
while we discuss the new distribution mode with the community.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-03-04 Thread via GitHub


sqd commented on PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#issuecomment-3998741585

   👍 I sent an email to the dev mailing list. Please feel free to chime in.
   
   https://lists.apache.org/thread/dwcmrkm8r90knjm3smmwhotkgjobz69d


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-03-04 Thread via GitHub


mxm commented on PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#issuecomment-3996869642

   +1 Since this is a core change, we should be notifying the community. I 
think we can make a good case for adding the round-robin distribution mode.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-03-04 Thread via GitHub


pvary commented on PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#issuecomment-3996790798

   > Hi, @mxm @pvary I would like some of your guidance here if you don't mind. 
To make the CI pass, I had to make some changes to the Spark code. The new 
ROUND_ROBIN distribution mode (which is a core API enum) was causing some Spark 
tests to fail because the tests iterate through all possible distribution modes 
[on this 
line](https://github.com/apache/iceberg/blob/main/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java#L1210-L1211).
   
   Before doing such a change, then discuss it with the community on the dev 
list 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-03-04 Thread via GitHub


mxm commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r2883052755


##
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteUtil.java:
##
@@ -76,6 +76,9 @@ public static SparkWriteRequirements writeRequirements(
   private static Distribution writeDistribution(Table table, DistributionMode 
mode) {
 switch (mode) {
   case NONE:
+// Spark's connector API has no round-robin distribution concept, so 
treat it the same
+// as unspecified and let Spark distribute data across tasks as it 
sees fit
+  case ROUND_ROBIN:
 return Distributions.unspecified();

Review Comment:
   I think we should add an extra case (no fall through) and fail until 
round-robin distribution mode is implemented in Spark.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-03-04 Thread via GitHub


mxm commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r2883057079


##
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteUtil.java:
##
@@ -106,6 +106,9 @@ public static SparkWriteRequirements writeRequirements(
   private static Distribution writeDistribution(Table table, DistributionMode 
mode) {
 switch (mode) {
   case NONE:
+// Spark's connector API has no round-robin distribution concept, so 
treat it the same
+// as unspecified and let Spark distribute data across tasks as it 
sees fit
+  case ROUND_ROBIN:
 return Distributions.unspecified();

Review Comment:
   I think we should add an extra case (no fall through) and fail until 
round-robin distribution mode is implemented in Spark.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-03-04 Thread via GitHub


mxm commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r2883052755


##
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteUtil.java:
##
@@ -76,6 +76,9 @@ public static SparkWriteRequirements writeRequirements(
   private static Distribution writeDistribution(Table table, DistributionMode 
mode) {
 switch (mode) {
   case NONE:
+// Spark's connector API has no round-robin distribution concept, so 
treat it the same
+// as unspecified and let Spark distribute data across tasks as it 
sees fit
+  case ROUND_ROBIN:
 return Distributions.unspecified();

Review Comment:
   I think we should add an extra case (no fall through) and fail until 
round-robin distribution mode is implemented in Spark.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-03-03 Thread via GitHub


sqd commented on PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#issuecomment-3993587678

   Hi, @mxm @pvary I would like some of your guidance here if you don't mind. 
To make the CI pass, I had to make some changes to the Spark code. The new 
ROUND_ROBIN distribution mode (which is a core API enum) was causing some Spark 
tests to fail because the tests iterate through all possible distribution modes 
[on this 
line](https://github.com/apache/iceberg/blob/main/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java#L1210-L1211).
 I fixed the tests by treating ROUND_ROBIN as an alias for NONE in Spark, 
because it seems the Spark connector doesn't have a similar concept. I am 
pretty new to this project and not sure if that's acceptable.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-03-03 Thread via GitHub


mxm commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r2878730306


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java:
##
@@ -120,6 +120,9 @@ private KeySelector getKeySelector(
 "Creating new KeySelector for table '{}' with distribution mode '{}'", 
tableName, mode);
 switch (mode) {
   case NONE:
+return row -> 0;

Review Comment:
   Ideally, we would throw here and avoid this path to get executed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-03-03 Thread via GitHub


mxm commented on PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#issuecomment-3991585480

   You are right, we missed that. This is indeed achieving what we discussed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-03-03 Thread via GitHub


sqd commented on PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#issuecomment-3991481205

   > I still see the flag in the above commit. We need to remove the flag and 
add a direct chained path, as an add-on to the existing topology.
   
   The flag is internal, not exposed through the public builder. I use a flag 
so I don't have to copy the code of DynamicIcebergSink into a very similar 
DynamicIcebergForwardSink or something. Instead we can reuse most of the logics.
   
   > side output
   
   Yes that's what I am doing (unless I am still missing something?).
   
   1. [These 
lines](https://github.com/apache/iceberg/pull/15433/changes#diff-801a5d7fe4c27b9fb615ad487faf3cad0215058556aa8655c8f9ca72e644aeb9R420-R441)
 create and wire the normal path non-chained variant sink
   2. [These 
lines](https://github.com/apache/iceberg/pull/15433/changes#diff-801a5d7fe4c27b9fb615ad487faf3cad0215058556aa8655c8f9ca72e644aeb9R451-R461)
 do the same for the new path chained variant sink
   
   Sorry about the CI failure. I know what's going on but a little swamped to 
fix them. I'll address them as soon as I can.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-03-03 Thread via GitHub


mxm commented on PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#issuecomment-3990817691

   Unless there is more to come after b7fe04c, I think there is indeed a 
misunderstanding. Let's recap the criteria (1) from above:
   
   >1. No special flag at all. We always create a side output topology for the 
direct path.
   
   I still see the flag in the above commit. We need to remove the flag and add 
a direct chained path, as an add-on to the existing topology.
   
   >Is there a way to have duplicated instances of writers (some chained, some 
are not chained) and route things out of the chained versions based on the 
mode? Sideoutput seems like a possibility, but maybe complicating the things so 
much is an overkill.
   
   Yes, this was what I suggested above and I think we had agreement on. Using 
a side output is the only option when we want a chained and a non-chained 
variant. "Side output" is just a fancy word for adding another output. 
Semantically, it is not different from the main output. Another option would be 
to multiplex via one output, but I think that makes things more complicated and 
harder to maintain.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-03-03 Thread via GitHub


pvary commented on PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#issuecomment-3990471727

   @mxm, @sqd: I think we have a misunderstanding here. In the `DynamicSink` 
currently the `mode` is set per record, while the pipeline is created when the 
sink is created. If we create the sink in a way that the operators are chained, 
then we can't depend on the mode set by the record themselves for routing.
   
   Is there a way to have duplicated instances of writers (some chained, some 
are not chained) and route things out of the chained versions based on the 
mode? Sideoutput seems like a possibility, but maybe complicating the things so 
much is an overkill.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-02-25 Thread via GitHub


sqd commented on PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#issuecomment-3961254603

   > That said, the potential performance improvements need to outweigh the 
slight increase in complexity
   
   I actually have some numbers! Before the change the pipeline took around 
1~1.5TB of memory and ~200 cores. With the change it shaved 50~70 cores (not to 
mention the increased throughput). Of course there are other computation going 
on as well, but parquet writing and Flink RowData serdes showed up in profiler 
to take >90% CPU combined. Serdes was taking up around 75% CPU of the actual 
parquet writing.
   
   > Could you share a bit more about your use case
   
   My use case is that I have a firehose of data that I want to ingest into 
Iceberg. Because the volume is so high, it doesn't really matter which writer 
subtasks a record is routed to, there won't be small files either way. I was 
running DistributionMode.NONE, and noticed that serdes was taking up a 
ridiculous amount of resources, also caused a lot of unnecessary network 
shuffling.
   
   > adding a new DistributionMode
   
   I am a big fan of calling it ROUND_ROBIN instead, but are we not worried 
about breaking existing code? Maybe introduce ROUND_ROBIN as an alias for NONE, 
and this new mode can be called "PASSTROUGH" or something?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-02-25 Thread via GitHub


sqd commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r2854642069


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##
@@ -411,28 +447,38 @@ public DataStreamSink append() {
   .name(operatorName("generator"))
   .returns(type);
 
-  DataStreamSink rowDataDataStreamSink =
-  converted
-  .getSideOutput(
-  new OutputTag<>(
-  DynamicRecordProcessor.DYNAMIC_TABLE_UPDATE_STREAM,
-  new DynamicRecordInternalType(catalogLoader, true, 
cacheMaximumSize)))
-  .keyBy((KeySelector) 
DynamicRecordInternal::tableName)
-  .map(
-  new DynamicTableUpdateOperator(
-  catalogLoader,
-  cacheMaximumSize,
-  cacheRefreshMs,
-  inputSchemasPerTableCacheMaximumSize,
-  tableCreator,
-  caseSensitive,
-  dropUnusedColumns))
-  .uid(prefixIfNotNull(uidPrefix, "-updater"))
-  .name(operatorName("Updater"))
-  .returns(type)
-  .union(converted)
-  .sinkTo(sink)
-  .uid(prefixIfNotNull(uidPrefix, "-sink"));
+  DataStreamSink rowDataDataStreamSink;
+  if (passthroughRecords) {
+if (!immediateUpdate) {
+  throw new UnsupportedOperationException(
+  "Immediate update must be enabled to pass through records");
+}
+rowDataDataStreamSink = 
converted.sinkTo(sink).uid(prefixIfNotNull(uidPrefix, "-sink"));
+  } else {

Review Comment:
   > to add a new chained side output with an extra DynamicWriter
   
   Could you elaborate on this please? I could be wrong but my understanding is 
that Flink cannot chain any operator with a side output.
   
   > rename it to ROUND_ROBIN and use NONE
   
   Yes that sounds a big improvement in semanticity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-02-25 Thread via GitHub


pvary commented on PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#issuecomment-3959846798

   > I think the idea here is valid
   
   You have either a very tricky balancing logic before the sink, or every 
table of yours is similarly loaded and continuously written. Not too dynamic 
IMHO 😄  
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-02-25 Thread via GitHub


mxm commented on PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#issuecomment-3958599768

   I think the idea here is valid, but we should implement this in a clean way, 
e.g. by adding a new `DistributionMode` and add an additional chained writer to 
the processor. We don't want to silently ignore other distribution modes based 
on some additional flag (`passthroughRecords(true)`). That said, the potential 
performance improvements need to outweigh the slight increase in complexity due 
to the changes in the job topology.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-02-25 Thread via GitHub


mxm commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r2852440369


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##
@@ -411,28 +447,38 @@ public DataStreamSink append() {
   .name(operatorName("generator"))
   .returns(type);
 
-  DataStreamSink rowDataDataStreamSink =
-  converted
-  .getSideOutput(
-  new OutputTag<>(
-  DynamicRecordProcessor.DYNAMIC_TABLE_UPDATE_STREAM,
-  new DynamicRecordInternalType(catalogLoader, true, 
cacheMaximumSize)))
-  .keyBy((KeySelector) 
DynamicRecordInternal::tableName)
-  .map(
-  new DynamicTableUpdateOperator(
-  catalogLoader,
-  cacheMaximumSize,
-  cacheRefreshMs,
-  inputSchemasPerTableCacheMaximumSize,
-  tableCreator,
-  caseSensitive,
-  dropUnusedColumns))
-  .uid(prefixIfNotNull(uidPrefix, "-updater"))
-  .name(operatorName("Updater"))
-  .returns(type)
-  .union(converted)
-  .sinkTo(sink)
-  .uid(prefixIfNotNull(uidPrefix, "-sink"));
+  DataStreamSink rowDataDataStreamSink;
+  if (passthroughRecords) {
+if (!immediateUpdate) {
+  throw new UnsupportedOperationException(
+  "Immediate update must be enabled to pass through records");
+}
+rowDataDataStreamSink = 
converted.sinkTo(sink).uid(prefixIfNotNull(uidPrefix, "-sink"));
+  } else {

Review Comment:
   `DistributionMode.NONE` in the regular sink does strict forward partitioning 
(no redistribution), which is similar to what the PR does. It leads to Flink 
chaining the input with the writer. For DynamicSink, because we have many 
tables, the idea was to spread out the data onto the available workers, which 
is why we opted for a round-robin across the workers chosen for the table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-02-25 Thread via GitHub


pvary commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r2851847940


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##
@@ -411,28 +447,38 @@ public DataStreamSink append() {
   .name(operatorName("generator"))
   .returns(type);
 
-  DataStreamSink rowDataDataStreamSink =
-  converted
-  .getSideOutput(
-  new OutputTag<>(
-  DynamicRecordProcessor.DYNAMIC_TABLE_UPDATE_STREAM,
-  new DynamicRecordInternalType(catalogLoader, true, 
cacheMaximumSize)))
-  .keyBy((KeySelector) 
DynamicRecordInternal::tableName)
-  .map(
-  new DynamicTableUpdateOperator(
-  catalogLoader,
-  cacheMaximumSize,
-  cacheRefreshMs,
-  inputSchemasPerTableCacheMaximumSize,
-  tableCreator,
-  caseSensitive,
-  dropUnusedColumns))
-  .uid(prefixIfNotNull(uidPrefix, "-updater"))
-  .name(operatorName("Updater"))
-  .returns(type)
-  .union(converted)
-  .sinkTo(sink)
-  .uid(prefixIfNotNull(uidPrefix, "-sink"));
+  DataStreamSink rowDataDataStreamSink;
+  if (passthroughRecords) {
+if (!immediateUpdate) {
+  throw new UnsupportedOperationException(
+  "Immediate update must be enabled to pass through records");
+}
+rowDataDataStreamSink = 
converted.sinkTo(sink).uid(prefixIfNotNull(uidPrefix, "-sink"));
+  } else {

Review Comment:
   How do we handle `DistributionMode` in the normal Sink?
   We should be consistent



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-02-25 Thread via GitHub


pvary commented on PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#issuecomment-3957669212

   @​sqd Could you share a bit more about your use case? Ignoring 
`DistributionMode` and chaining directly to writers feels quite risky to me, 
even if the performance gains are tempting.
   
   This approach might work if your input records are already correctly 
distributed. But any mistake there will lead to small files or skewed 
writes—fast for the writers, but potentially very costly for the readers.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-02-25 Thread via GitHub


mxm commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r2851572372


##
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##
@@ -411,28 +447,38 @@ public DataStreamSink append() {
   .name(operatorName("generator"))
   .returns(type);
 
-  DataStreamSink rowDataDataStreamSink =
-  converted
-  .getSideOutput(
-  new OutputTag<>(
-  DynamicRecordProcessor.DYNAMIC_TABLE_UPDATE_STREAM,
-  new DynamicRecordInternalType(catalogLoader, true, 
cacheMaximumSize)))
-  .keyBy((KeySelector) 
DynamicRecordInternal::tableName)
-  .map(
-  new DynamicTableUpdateOperator(
-  catalogLoader,
-  cacheMaximumSize,
-  cacheRefreshMs,
-  inputSchemasPerTableCacheMaximumSize,
-  tableCreator,
-  caseSensitive,
-  dropUnusedColumns))
-  .uid(prefixIfNotNull(uidPrefix, "-updater"))
-  .name(operatorName("Updater"))
-  .returns(type)
-  .union(converted)
-  .sinkTo(sink)
-  .uid(prefixIfNotNull(uidPrefix, "-sink"));
+  DataStreamSink rowDataDataStreamSink;
+  if (passthroughRecords) {
+if (!immediateUpdate) {
+  throw new UnsupportedOperationException(
+  "Immediate update must be enabled to pass through records");
+}
+rowDataDataStreamSink = 
converted.sinkTo(sink).uid(prefixIfNotNull(uidPrefix, "-sink"));
+  } else {

Review Comment:
   This will ignore `DistributionMode` and partitioning in `DynamicRecord`. I 
saw that you listed this in the docs, but I'm not sure we should diverge too 
much from the normal mode of operation. I think what we can do, is to add a new 
chained side output with an extra DynamicWriter for this quick path. 
   
   It may be worth adding a new DistributionMode. Currently NONE does a 
round-robin, which is slightly confusing, we could rename it to ROUND_ROBIN and 
use NONE for this direct path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Flink: Add passthroughRecords option to DynamicIcebergSink [iceberg]

2026-02-24 Thread via GitHub


sqd commented on PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#issuecomment-3953184849

   @mxm @pvary I would appreciate if you could please take a look. I'm happy to 
provide any detail/context. I have tested this on an internal pipeline which 
processes around 10TB~20TB of data per hour, where this change has drastically 
reduced the resources usage and increased output.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]