Re: [PR] Initial Iceberg Sink [beam]
kennknowles commented on code in PR #30797: URL: https://github.com/apache/beam/pull/30797#discussion_r1597044554 ## sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/AppendFilesToTables.java: ## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; + +class AppendFilesToTables +extends PTransform, PCollection>> { + + private final IcebergCatalogConfig catalogConfig; + + AppendFilesToTables(IcebergCatalogConfig catalogConfig) { +this.catalogConfig = catalogConfig; + } + + @Override + public PCollection> expand(PCollection writtenFiles) { + +// Apply any sharded writes and flatten everything for catalog updates +return writtenFiles +.apply( +"Key metadata updates by table", +WithKeys.of( +new SerializableFunction() { + @Override + public String apply(FileWriteResult input) { +return input.getTableIdentifier().toString(); + } +})) +// .setCoder(KvCoder.of(StringUtf8Coder.of(), new MetadataUpdate.MetadataUpdateCoder())) +.apply("Group metadata updates by table", GroupByKey.create()) +.apply( +"Append metadata updates to tables", +ParDo.of(new AppendFilesToTablesDoFn(catalogConfig))) +.setCoder(KvCoder.of(StringUtf8Coder.of(), SerializableCoder.of(Snapshot.class))); + } + + private static class AppendFilesToTablesDoFn + extends DoFn>, KV> { + +private final IcebergCatalogConfig catalogConfig; + +private transient @MonotonicNonNull Catalog catalog; + +private AppendFilesToTablesDoFn(IcebergCatalogConfig catalogConfig) { + this.catalogConfig = catalogConfig; +} + +private Catalog getCatalog() { + if (catalog == null) { +catalog = catalogConfig.catalog(); + } + return catalog; +} + +@ProcessElement +public void processElement( +@Element KV> element, +OutputReceiver> out, +BoundedWindow window) { + Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey())); + AppendFiles update = table.newAppend(); + for (FileWriteResult writtenFile : element.getValue()) { +update.appendFile(writtenFile.getDataFile()); + } + update.commit(); Review Comment: I am not that familiar with the iceberg libraries. I was under the impression that the optimistic concurrency protocol was handled by them (https://iceberg.apache.org/docs/1.5.2/reliability/#concurrent-write-operations and on filesystem tables described by https://iceberg.apache.org/spec/#file-system-tables). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Initial Iceberg Sink [beam]
jbonofre commented on code in PR #30797: URL: https://github.com/apache/beam/pull/30797#discussion_r1594958925 ## sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/AppendFilesToTables.java: ## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; + +class AppendFilesToTables +extends PTransform, PCollection>> { + + private final IcebergCatalogConfig catalogConfig; + + AppendFilesToTables(IcebergCatalogConfig catalogConfig) { +this.catalogConfig = catalogConfig; + } + + @Override + public PCollection> expand(PCollection writtenFiles) { + +// Apply any sharded writes and flatten everything for catalog updates +return writtenFiles +.apply( +"Key metadata updates by table", +WithKeys.of( +new SerializableFunction() { + @Override + public String apply(FileWriteResult input) { +return input.getTableIdentifier().toString(); + } +})) +// .setCoder(KvCoder.of(StringUtf8Coder.of(), new MetadataUpdate.MetadataUpdateCoder())) +.apply("Group metadata updates by table", GroupByKey.create()) +.apply( +"Append metadata updates to tables", +ParDo.of(new AppendFilesToTablesDoFn(catalogConfig))) +.setCoder(KvCoder.of(StringUtf8Coder.of(), SerializableCoder.of(Snapshot.class))); + } + + private static class AppendFilesToTablesDoFn + extends DoFn>, KV> { + +private final IcebergCatalogConfig catalogConfig; + +private transient @MonotonicNonNull Catalog catalog; + +private AppendFilesToTablesDoFn(IcebergCatalogConfig catalogConfig) { + this.catalogConfig = catalogConfig; +} + +private Catalog getCatalog() { + if (catalog == null) { +catalog = catalogConfig.catalog(); + } + return catalog; +} + +@ProcessElement +public void processElement( +@Element KV> element, +OutputReceiver> out, +BoundedWindow window) { + Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey())); + AppendFiles update = table.newAppend(); + for (FileWriteResult writtenFile : element.getValue()) { +update.appendFile(writtenFile.getDataFile()); + } + update.commit(); Review Comment: Sorry to be late on this, I just wondering if we would not need a kind of "commit coordinator" to be sure we have one commit at a time: if we have concurrent commits, it could be problematic in Iceberg. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Initial Iceberg Sink [beam]
kennknowles merged PR #30797: URL: https://github.com/apache/beam/pull/30797 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Initial Iceberg Sink [beam]
kennknowles commented on code in PR #30797: URL: https://github.com/apache/beam/pull/30797#discussion_r1557812336 ## sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/RecordWriter.java: ## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import static org.apache.beam.io.iceberg.RowHelper.rowToRecord; + +import java.io.IOException; +import org.apache.beam.sdk.values.Row; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; + +class RecordWriter { + + private final DataWriter icebergDataWriter; + + private final Table table; + + RecordWriter(Catalog catalog, IcebergDestination destination, String filename) + throws IOException { +this( +catalog.loadTable(destination.getTableIdentifier()), destination.getFileFormat(), filename); + } + + RecordWriter(Table table, FileFormat fileFormat, String filename) throws IOException { +this.table = table; + +String absoluteFilename = table.location() + "/" + filename; +OutputFile outputFile = table.io().newOutputFile(absoluteFilename); +switch (fileFormat) { + case AVRO: +icebergDataWriter = +Avro.writeData(outputFile) + .createWriterFunc(org.apache.iceberg.data.avro.DataWriter::create) +.schema(table.schema()) +.withSpec(table.spec()) +.overwrite() +.build(); +break; + case PARQUET: +icebergDataWriter = +Parquet.writeData(outputFile) +.createWriterFunc(GenericParquetWriter::buildWriter) +.schema(table.schema()) +.withSpec(table.spec()) +.overwrite() +.build(); +break; + case ORC: +throw new UnsupportedOperationException("ORC file format not currently supported."); +//icebergDataWriter = Review Comment: Done ## settings.gradle.kts: ## @@ -355,3 +355,7 @@ include("sdks:java:io:kafka:kafka-01103") findProject(":sdks:java:io:kafka:kafka-01103")?.name = "kafka-01103" include("sdks:java:managed") findProject(":sdks:java:managed")?.name = "managed" +include("sdks:java:io:iceberg") +findProject(":sdks:java:io:iceberg")?.name = "iceberg" +include("sdks:java:io:catalog") Review Comment: Removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Initial Iceberg Sink [beam]
chamikaramj commented on code in PR #30797: URL: https://github.com/apache/beam/pull/30797#discussion_r1556446483 ## sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/RecordWriter.java: ## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import static org.apache.beam.io.iceberg.RowHelper.rowToRecord; + +import java.io.IOException; +import org.apache.beam.sdk.values.Row; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; + +class RecordWriter { + + private final DataWriter icebergDataWriter; + + private final Table table; + + RecordWriter(Catalog catalog, IcebergDestination destination, String filename) + throws IOException { +this( +catalog.loadTable(destination.getTableIdentifier()), destination.getFileFormat(), filename); + } + + RecordWriter(Table table, FileFormat fileFormat, String filename) throws IOException { +this.table = table; + +String absoluteFilename = table.location() + "/" + filename; +OutputFile outputFile = table.io().newOutputFile(absoluteFilename); +switch (fileFormat) { + case AVRO: +icebergDataWriter = +Avro.writeData(outputFile) + .createWriterFunc(org.apache.iceberg.data.avro.DataWriter::create) +.schema(table.schema()) +.withSpec(table.spec()) +.overwrite() +.build(); +break; + case PARQUET: +icebergDataWriter = +Parquet.writeData(outputFile) +.createWriterFunc(GenericParquetWriter::buildWriter) +.schema(table.schema()) +.withSpec(table.spec()) +.overwrite() +.build(); +break; + case ORC: +throw new UnsupportedOperationException("ORC file format not currently supported."); +//icebergDataWriter = Review Comment: Delete ? ## settings.gradle.kts: ## @@ -355,3 +355,7 @@ include("sdks:java:io:kafka:kafka-01103") findProject(":sdks:java:io:kafka:kafka-01103")?.name = "kafka-01103" include("sdks:java:managed") findProject(":sdks:java:managed")?.name = "managed" +include("sdks:java:io:iceberg") +findProject(":sdks:java:io:iceberg")?.name = "iceberg" +include("sdks:java:io:catalog") Review Comment: It doesn't look like we add anything under "sdks:java:io:catalog". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Initial Iceberg Sink [beam]
chamikaramj commented on code in PR #30797: URL: https://github.com/apache/beam/pull/30797#discussion_r1556356489 ## sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/AppendFilesToTables.java: ## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; + +class AppendFilesToTables +extends PTransform, PCollection>> { + + private final IcebergCatalogConfig catalogConfig; + + AppendFilesToTables(IcebergCatalogConfig catalogConfig) { +this.catalogConfig = catalogConfig; + } + + @Override + public PCollection> expand(PCollection writtenFiles) { + +// Apply any sharded writes and flatten everything for catalog updates +return writtenFiles +.apply( +"Key metadata updates by table", +WithKeys.of( +new SerializableFunction() { + @Override + public String apply(FileWriteResult input) { +return input.getTableIdentifier().toString(); + } +})) +// .setCoder(KvCoder.of(StringUtf8Coder.of(), new MetadataUpdate.MetadataUpdateCoder())) +.apply("Group metadata updates by table", GroupByKey.create()) +.apply( +"Append metadata updates to tables", +ParDo.of(new AppendFilesToTablesDoFn(catalogConfig))) +.setCoder(KvCoder.of(StringUtf8Coder.of(), SerializableCoder.of(Snapshot.class))); + } + + private static class AppendFilesToTablesDoFn + extends DoFn>, KV> { + +private final IcebergCatalogConfig catalogConfig; + +private transient @MonotonicNonNull Catalog catalog; + +private AppendFilesToTablesDoFn(IcebergCatalogConfig catalogConfig) { + this.catalogConfig = catalogConfig; +} + +private Catalog getCatalog() { + if (catalog == null) { +catalog = catalogConfig.catalog(); + } + return catalog; +} + +@ProcessElement +public void processElement( +@Element KV> element, +OutputReceiver> out, +BoundedWindow window) { + Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey())); + AppendFiles update = table.newAppend(); + for (FileWriteResult writtenFile : element.getValue()) { +update.appendFile(writtenFile.getDataFile()); + } + update.commit(); Review Comment: Yeah, (2) is fine. It's more about making sure that we don't double write if a work item fails. But if writing is idempotent it's simpler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Initial Iceberg Sink [beam]
kennknowles commented on code in PR #30797: URL: https://github.com/apache/beam/pull/30797#discussion_r1552651757 ## sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIO.java: ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; + +public class IcebergIO { + + public static WriteRows writeToDynamicDestinations( Review Comment: We could do that. I was thinking that we might make convenience methods later like `writeToTable(catalog, table_id)` so I made this name extra long. We can always add others. I tend to prefer different method names rather than overloading. ## sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/AppendFilesToTables.java: ## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; + +class AppendFilesToTables +extends PTransform, PCollection>> { + + private final IcebergCatalogConfig catalogConfig; + + AppendFilesToTables(IcebergCatalogConfig catalogConfig) { +this.catalogConfig = catalogConfig; + } + + @Override + public PCollection> expand(PCollection writtenFiles) { + +// Apply any sharded writes and flatten everything for catalog updates +return writtenFiles +.apply( +"Key metadata updates by table", +WithKeys.of( +new SerializableFunction() { + @Override + public String apply(FileWriteResult input) { +return input.getTableIdentifier().toString(); + } +})) +// .setCoder(KvCoder.of(StringUtf8Coder.of(), new MetadataUpdate.MetadataUpdateCoder())) Review Comment: Done ## sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/FileWriteResult.java: ## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to
Re: [PR] Initial Iceberg Sink [beam]
chamikaramj commented on code in PR #30797: URL: https://github.com/apache/beam/pull/30797#discussion_r1552407817 ## sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/AppendFilesToTables.java: ## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; + +class AppendFilesToTables +extends PTransform, PCollection>> { + + private final IcebergCatalogConfig catalogConfig; + + AppendFilesToTables(IcebergCatalogConfig catalogConfig) { +this.catalogConfig = catalogConfig; + } + + @Override + public PCollection> expand(PCollection writtenFiles) { + +// Apply any sharded writes and flatten everything for catalog updates +return writtenFiles +.apply( +"Key metadata updates by table", +WithKeys.of( +new SerializableFunction() { + @Override + public String apply(FileWriteResult input) { +return input.getTableIdentifier().toString(); + } +})) +// .setCoder(KvCoder.of(StringUtf8Coder.of(), new MetadataUpdate.MetadataUpdateCoder())) Review Comment: Uncomment or delete. ## sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/OneTableDynamicDestinations.java: ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.catalog.TableIdentifier; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; + +class OneTableDynamicDestinations implements DynamicDestinations { + + private static final Schema EMPTY_SCHEMA = Schema.builder().build(); + private static final Row EMPTY_ROW = Row.nullRow(EMPTY_SCHEMA); + + // TableId represented as String for serializability + private final String tableIdString; + + private transient @MonotonicNonNull TableIdentifier tableId; + + private TableIdentifier getTableIdentifier() { +if (tableId == null) { + tableId = TableIdentifier.parse(tableIdString); +} +return tableId; + } + + OneTableDynamicDestinations(TableIdentifier tableId) { +this.tableIdString = tableId.toString(); + } + + @Override + public Schema getMetadataSchema() { +return EMPTY_SCHEMA; + } + + @Override + public Row assignDestinationMetadata(Row data) { +return EMPTY_ROW; + } + + @Override + public IcebergDestination
Re: [PR] Initial Iceberg Sink [beam]
kennknowles commented on PR #30797: URL: https://github.com/apache/beam/pull/30797#issuecomment-2037849642 > OK I have done a whole massive revision and tested it a little bit more. > > The only piece that I have not revised is the `IcebergCatalogConfig` which gets turned into an `org.apache.iceberg.catalog.Catalog` on the client and each worker separately. I think your suggestion was to try to use just a big key-value map for all the config values. I am fine with that. I don't really know enough about it yet. All my deep dives into iceberg Java libraries was for other pieces. It looks like this might work: https://github.com/tabular-io/iceberg-kafka-connect/blob/5ab5c538efab9ccf3cde166f36ba34189eed7187/kafka-connect/src/main/java/io/tabular/iceberg/connect/IcebergSinkConfig.java#L256 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Initial Iceberg Sink [beam]
kennknowles commented on PR #30797: URL: https://github.com/apache/beam/pull/30797#issuecomment-2037846183 OK I have done a whole massive revision and tested it a little bit more. The only piece that I have not revised is the `IcebergCatalogConfig` which gets turned into an `org.apache.iceberg.catalog.Catalog` on the client and each worker separately. I think your suggestion was to try to use just a big key-value map for all the config values. I am fine with that. I don't really know enough about it yet. All my deep dives into iceberg Java libraries was for other pieces. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Initial Iceberg Sink [beam]
kennknowles commented on code in PR #30797: URL: https://github.com/apache/beam/pull/30797#discussion_r1552179077 ## sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/WriteToDestinations.java: ## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import java.util.Collections; +import java.util.UUID; +import org.apache.beam.io.iceberg.WriteBundlesToFiles.Result; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.ShardedKeyCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.ShardedKey; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; + +class WriteToDestinations +extends PTransform< +PCollection>, IcebergWriteResult> { + + @VisibleForTesting static final int DEFAULT_MAX_WRITERS_PER_BUNDLE = 20; + @VisibleForTesting static final int DEFAULT_MAX_FILES_PER_PARTITION = 10_000; + @VisibleForTesting static final long DEFAULT_MAX_BYTES_PER_PARTITION = 10L * (1L << 40); // 10TB + static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 40); // 1TB + static final int DEFAULT_NUM_FILE_SHARDS = 0; + static final int FILE_TRIGGERING_RECORD_COUNT = 50_000; + + private final Coder destinationCoder; + + private final RecordWriterFactory recordWriterFactory; + private final TableFactory tableFactory; + + WriteToDestinations( + Coder destinationCoder, + RecordWriterFactory recordWriterFactory, + TableFactory tableFactory) { +this.destinationCoder = destinationCoder; +this.recordWriterFactory = recordWriterFactory; +this.tableFactory = tableFactory; + } + + private PCollectionView createJobIdPrefixView(Pipeline p) { + +final String jobName = p.getOptions().getJobName(); + +return p.apply("JobIdCreationRoot_", Create.of((Void) null)) +.apply( +"CreateJobId", +ParDo.of( +new DoFn() { + @ProcessElement + public void process(ProcessContext c) { +c.output(jobName + "-" + UUID.randomUUID().toString()); + } +})) +.apply("JobIdSideInput", View.asSingleton()); + } + + @Override + public IcebergWriteResult expand( + PCollection> input) { + +final PCollectionView fileView = createJobIdPrefixView(input.getPipeline()); +// We always do the equivalent of a dynamically sharded file creation +TupleTag> writtenFilesTag = new TupleTag<>("writtenFiles"); +TupleTag, ElementT>> successfulWritesTag = +new TupleTag<>("successfulWrites"); +TupleTag, ElementT>> failedWritesTag = +new TupleTag<>("failedWrites"); +TupleTag> snapshotsTag = new TupleTag<>("snapshots"); + +final Coder elementCoder = +((KvCoder)
Re: [PR] Initial Iceberg Sink [beam]
codecov-commenter commented on PR #30797: URL: https://github.com/apache/beam/pull/30797#issuecomment-2037829820 ## [Codecov](https://app.codecov.io/gh/apache/beam/pull/30797?dropdown=coverage=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) Report All modified and coverable lines are covered by tests :white_check_mark: > Project coverage is 0.00%. Comparing base [(`61eee6d`)](https://app.codecov.io/gh/apache/beam/commit/61eee6dd672800ed88bd0851a235e7b13ee10847?dropdown=coverage=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) to head [(`8faea3f`)](https://app.codecov.io/gh/apache/beam/pull/30797?dropdown=coverage=pr=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache). > Report is 1 commits behind head on master. > :exclamation: Current head 8faea3f differs from pull request most recent head 5af12aa. Consider uploading reports for the commit 5af12aa to get more accurate results Additional details and impacted files ```diff @@ Coverage Diff @@ ## master #30797 +/- ## = - Coverage 70.74%0 -70.75% = Files 12560 -1256 Lines1407690 -140769 Branches 43070 -4307 = - Hits 995920-99592 + Misses377000-37700 + Partials 34770 -3477 ``` | [Flag](https://app.codecov.io/gh/apache/beam/pull/30797/flags?src=pr=flags_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | Coverage Δ | | |---|---|---| | [go](https://app.codecov.io/gh/apache/beam/pull/30797/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | `?` | | | [java](https://app.codecov.io/gh/apache/beam/pull/30797/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | `?` | | | [python](https://app.codecov.io/gh/apache/beam/pull/30797/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | `?` | | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more. [:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/beam/pull/30797?dropdown=coverage=pr=continue_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache). :loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Initial Iceberg Sink [beam]
kennknowles commented on code in PR #30797: URL: https://github.com/apache/beam/pull/30797#discussion_r1547049828 ## sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIO.java: ## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import java.util.Arrays; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableBiFunction; +import org.apache.beam.sdk.transforms.SerializableFunctions; +import org.apache.beam.sdk.values.PCollection; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; +import org.checkerframework.checker.nullness.qual.NonNull; + +public class IcebergIO { + + public static Write writeToDestinations( + IcebergCatalog catalog, + DynamicDestinations dynamicDestinations, + SerializableBiFunction toRecord) { +return new Write<>(catalog, dynamicDestinations, toRecord); + } + + public static TableFactory forCatalog(final IcebergCatalog catalog) { Review Comment: TBD. Leaving all "catalog" questions unresolved for this revision. ## sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/WriteToDestinations.java: ## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import java.util.Collections; +import java.util.UUID; +import org.apache.beam.io.iceberg.WriteBundlesToFiles.Result; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.ShardedKeyCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.ShardedKey; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; + +class WriteToDestinations +extends PTransform< +PCollection>, IcebergWriteResult> { + + @VisibleForTesting static final int DEFAULT_MAX_WRITERS_PER_BUNDLE = 20; + @VisibleForTesting static final int DEFAULT_MAX_FILES_PER_PARTITION = 10_000; +
Re: [PR] Initial Iceberg Sink [beam]
codecov[bot] commented on PR #30797: URL: https://github.com/apache/beam/pull/30797#issuecomment-2030943008 ## [Codecov](https://app.codecov.io/gh/apache/beam/pull/30797?dropdown=coverage=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) Report All modified and coverable lines are covered by tests :white_check_mark: > Project coverage is 71.47%. Comparing base [(`069c045`)](https://app.codecov.io/gh/apache/beam/commit/069c0459c2c17c5ef7c393d0ce77182208614f8d?dropdown=coverage=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) to head [(`a06a187`)](https://app.codecov.io/gh/apache/beam/pull/30797?dropdown=coverage=pr=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache). > Report is 19 commits behind head on master. > :exclamation: Current head a06a187 differs from pull request most recent head 2cffca8. Consider uploading reports for the commit 2cffca8 to get more accurate results Additional details and impacted files ```diff @@ Coverage Diff @@ ## master #30797 +/- ## === Coverage 71.47% 71.47% === Files 710 710 Lines 104815 104815 === Hits7491574915 Misses 2826828268 Partials 1632 1632 ``` [:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/beam/pull/30797?dropdown=coverage=pr=continue_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache). :loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Initial Iceberg Sink [beam]
chamikaramj commented on code in PR #30797: URL: https://github.com/apache/beam/pull/30797#discussion_r1544853193 ## sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/DynamicDestinations.java: ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import static org.apache.beam.sdk.values.TypeDescriptors.extractFromTypeParameters; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + +import java.io.Serializable; +import java.util.List; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.checkerframework.checker.nullness.qual.Nullable; + +public abstract class DynamicDestinations implements Serializable { Review Comment: Seems like this has a lot of copied over logic from BQ dynamic destinations which probably we can simplify/change if we went with the new DLQ framework. https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java ## sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/WriteToDestinations.java: ## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import java.util.Collections; +import java.util.UUID; +import org.apache.beam.io.iceberg.WriteBundlesToFiles.Result; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.ShardedKeyCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.ShardedKey; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import
Re: [PR] Initial Iceberg Sink [beam]
chamikaramj commented on code in PR #30797: URL: https://github.com/apache/beam/pull/30797#discussion_r1544706525 ## sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIO.java: ## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import java.util.Arrays; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableBiFunction; +import org.apache.beam.sdk.transforms.SerializableFunctions; +import org.apache.beam.sdk.values.PCollection; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; +import org.checkerframework.checker.nullness.qual.NonNull; + +public class IcebergIO { + + public static Write writeToDestinations( + IcebergCatalog catalog, + DynamicDestinations dynamicDestinations, Review Comment: I'm wondering if we can strip dynamic destinations based on UDFs out and think about how to introduce dynamic destinations to this I/O in a portable way based on https://s.apache.org/portable-dynamic-destinations ## sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/WriteToDestinations.java: ## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import java.util.Collections; +import java.util.UUID; +import org.apache.beam.io.iceberg.WriteBundlesToFiles.Result; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.ShardedKeyCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.ShardedKey; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; + +class WriteToDestinations +extends PTransform< +PCollection>, IcebergWriteResult> { + + @VisibleForTesting static final int DEFAULT_MAX_WRITERS_PER_BUNDLE = 20; Review Comment: Any idea how we got to these defaults ? (if so we should document) ##
Re: [PR] Initial Iceberg Sink [beam]
kennknowles commented on code in PR #30797: URL: https://github.com/apache/beam/pull/30797#discussion_r1543839400 ## buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy: ## @@ -1151,7 +1151,7 @@ class BeamModulePlugin implements Plugin { options.compilerArgs += ([ '-parameters', '-Xlint:all', - '-Werror' +// '-Werror' Review Comment: lol I missed this one -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Initial Iceberg Sink [beam]
github-actions[bot] commented on PR #30797: URL: https://github.com/apache/beam/pull/30797#issuecomment-2026314950 Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Initial Iceberg Sink [beam]
kennknowles commented on PR #30797: URL: https://github.com/apache/beam/pull/30797#issuecomment-2026313742 R: @chamikaramj -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org