Re: [PR] Initial Iceberg Sink [beam]

2024-05-10 Thread via GitHub


kennknowles commented on code in PR #30797:
URL: https://github.com/apache/beam/pull/30797#discussion_r1597044554


##
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/AppendFilesToTables.java:
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.iceberg;
+
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+
+class AppendFilesToTables
+extends PTransform, PCollection>> {
+
+  private final IcebergCatalogConfig catalogConfig;
+
+  AppendFilesToTables(IcebergCatalogConfig catalogConfig) {
+this.catalogConfig = catalogConfig;
+  }
+
+  @Override
+  public PCollection> expand(PCollection 
writtenFiles) {
+
+// Apply any sharded writes and flatten everything for catalog updates
+return writtenFiles
+.apply(
+"Key metadata updates by table",
+WithKeys.of(
+new SerializableFunction() {
+  @Override
+  public String apply(FileWriteResult input) {
+return input.getTableIdentifier().toString();
+  }
+}))
+// .setCoder(KvCoder.of(StringUtf8Coder.of(), new 
MetadataUpdate.MetadataUpdateCoder()))
+.apply("Group metadata updates by table", GroupByKey.create())
+.apply(
+"Append metadata updates to tables",
+ParDo.of(new AppendFilesToTablesDoFn(catalogConfig)))
+.setCoder(KvCoder.of(StringUtf8Coder.of(), 
SerializableCoder.of(Snapshot.class)));
+  }
+
+  private static class AppendFilesToTablesDoFn
+  extends DoFn>, KV> {
+
+private final IcebergCatalogConfig catalogConfig;
+
+private transient @MonotonicNonNull Catalog catalog;
+
+private AppendFilesToTablesDoFn(IcebergCatalogConfig catalogConfig) {
+  this.catalogConfig = catalogConfig;
+}
+
+private Catalog getCatalog() {
+  if (catalog == null) {
+catalog = catalogConfig.catalog();
+  }
+  return catalog;
+}
+
+@ProcessElement
+public void processElement(
+@Element KV> element,
+OutputReceiver> out,
+BoundedWindow window) {
+  Table table = 
getCatalog().loadTable(TableIdentifier.parse(element.getKey()));
+  AppendFiles update = table.newAppend();
+  for (FileWriteResult writtenFile : element.getValue()) {
+update.appendFile(writtenFile.getDataFile());
+  }
+  update.commit();

Review Comment:
   I am not that familiar with the iceberg libraries. I was under the 
impression that the optimistic concurrency protocol was handled by them 
(https://iceberg.apache.org/docs/1.5.2/reliability/#concurrent-write-operations 
and on filesystem tables described by 
https://iceberg.apache.org/spec/#file-system-tables).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Initial Iceberg Sink [beam]

2024-05-08 Thread via GitHub


jbonofre commented on code in PR #30797:
URL: https://github.com/apache/beam/pull/30797#discussion_r1594958925


##
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/AppendFilesToTables.java:
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.iceberg;
+
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+
+class AppendFilesToTables
+extends PTransform, PCollection>> {
+
+  private final IcebergCatalogConfig catalogConfig;
+
+  AppendFilesToTables(IcebergCatalogConfig catalogConfig) {
+this.catalogConfig = catalogConfig;
+  }
+
+  @Override
+  public PCollection> expand(PCollection 
writtenFiles) {
+
+// Apply any sharded writes and flatten everything for catalog updates
+return writtenFiles
+.apply(
+"Key metadata updates by table",
+WithKeys.of(
+new SerializableFunction() {
+  @Override
+  public String apply(FileWriteResult input) {
+return input.getTableIdentifier().toString();
+  }
+}))
+// .setCoder(KvCoder.of(StringUtf8Coder.of(), new 
MetadataUpdate.MetadataUpdateCoder()))
+.apply("Group metadata updates by table", GroupByKey.create())
+.apply(
+"Append metadata updates to tables",
+ParDo.of(new AppendFilesToTablesDoFn(catalogConfig)))
+.setCoder(KvCoder.of(StringUtf8Coder.of(), 
SerializableCoder.of(Snapshot.class)));
+  }
+
+  private static class AppendFilesToTablesDoFn
+  extends DoFn>, KV> {
+
+private final IcebergCatalogConfig catalogConfig;
+
+private transient @MonotonicNonNull Catalog catalog;
+
+private AppendFilesToTablesDoFn(IcebergCatalogConfig catalogConfig) {
+  this.catalogConfig = catalogConfig;
+}
+
+private Catalog getCatalog() {
+  if (catalog == null) {
+catalog = catalogConfig.catalog();
+  }
+  return catalog;
+}
+
+@ProcessElement
+public void processElement(
+@Element KV> element,
+OutputReceiver> out,
+BoundedWindow window) {
+  Table table = 
getCatalog().loadTable(TableIdentifier.parse(element.getKey()));
+  AppendFiles update = table.newAppend();
+  for (FileWriteResult writtenFile : element.getValue()) {
+update.appendFile(writtenFile.getDataFile());
+  }
+  update.commit();

Review Comment:
   Sorry to be late on this, I just wondering if we would not need a kind of 
"commit coordinator" to be sure we have one commit at a time: if we have 
concurrent commits, it could be problematic in Iceberg.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Initial Iceberg Sink [beam]

2024-04-09 Thread via GitHub


kennknowles merged PR #30797:
URL: https://github.com/apache/beam/pull/30797


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Initial Iceberg Sink [beam]

2024-04-09 Thread via GitHub


kennknowles commented on code in PR #30797:
URL: https://github.com/apache/beam/pull/30797#discussion_r1557812336


##
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/RecordWriter.java:
##
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.iceberg;
+
+import static org.apache.beam.io.iceberg.RowHelper.rowToRecord;
+
+import java.io.IOException;
+import org.apache.beam.sdk.values.Row;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+
+class RecordWriter {
+
+  private final DataWriter icebergDataWriter;
+
+  private final Table table;
+
+  RecordWriter(Catalog catalog, IcebergDestination destination, String 
filename)
+  throws IOException {
+this(
+catalog.loadTable(destination.getTableIdentifier()), 
destination.getFileFormat(), filename);
+  }
+
+  RecordWriter(Table table, FileFormat fileFormat, String filename) throws 
IOException {
+this.table = table;
+
+String absoluteFilename = table.location() + "/" + filename;
+OutputFile outputFile = table.io().newOutputFile(absoluteFilename);
+switch (fileFormat) {
+  case AVRO:
+icebergDataWriter =
+Avro.writeData(outputFile)
+
.createWriterFunc(org.apache.iceberg.data.avro.DataWriter::create)
+.schema(table.schema())
+.withSpec(table.spec())
+.overwrite()
+.build();
+break;
+  case PARQUET:
+icebergDataWriter =
+Parquet.writeData(outputFile)
+.createWriterFunc(GenericParquetWriter::buildWriter)
+.schema(table.schema())
+.withSpec(table.spec())
+.overwrite()
+.build();
+break;
+  case ORC:
+throw new UnsupportedOperationException("ORC file format not currently 
supported.");
+//icebergDataWriter =

Review Comment:
   Done



##
settings.gradle.kts:
##
@@ -355,3 +355,7 @@ include("sdks:java:io:kafka:kafka-01103")
 findProject(":sdks:java:io:kafka:kafka-01103")?.name = "kafka-01103"
 include("sdks:java:managed")
 findProject(":sdks:java:managed")?.name = "managed"
+include("sdks:java:io:iceberg")
+findProject(":sdks:java:io:iceberg")?.name = "iceberg"
+include("sdks:java:io:catalog")

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Initial Iceberg Sink [beam]

2024-04-08 Thread via GitHub


chamikaramj commented on code in PR #30797:
URL: https://github.com/apache/beam/pull/30797#discussion_r1556446483


##
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/RecordWriter.java:
##
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.iceberg;
+
+import static org.apache.beam.io.iceberg.RowHelper.rowToRecord;
+
+import java.io.IOException;
+import org.apache.beam.sdk.values.Row;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+
+class RecordWriter {
+
+  private final DataWriter icebergDataWriter;
+
+  private final Table table;
+
+  RecordWriter(Catalog catalog, IcebergDestination destination, String 
filename)
+  throws IOException {
+this(
+catalog.loadTable(destination.getTableIdentifier()), 
destination.getFileFormat(), filename);
+  }
+
+  RecordWriter(Table table, FileFormat fileFormat, String filename) throws 
IOException {
+this.table = table;
+
+String absoluteFilename = table.location() + "/" + filename;
+OutputFile outputFile = table.io().newOutputFile(absoluteFilename);
+switch (fileFormat) {
+  case AVRO:
+icebergDataWriter =
+Avro.writeData(outputFile)
+
.createWriterFunc(org.apache.iceberg.data.avro.DataWriter::create)
+.schema(table.schema())
+.withSpec(table.spec())
+.overwrite()
+.build();
+break;
+  case PARQUET:
+icebergDataWriter =
+Parquet.writeData(outputFile)
+.createWriterFunc(GenericParquetWriter::buildWriter)
+.schema(table.schema())
+.withSpec(table.spec())
+.overwrite()
+.build();
+break;
+  case ORC:
+throw new UnsupportedOperationException("ORC file format not currently 
supported.");
+//icebergDataWriter =

Review Comment:
   Delete ?



##
settings.gradle.kts:
##
@@ -355,3 +355,7 @@ include("sdks:java:io:kafka:kafka-01103")
 findProject(":sdks:java:io:kafka:kafka-01103")?.name = "kafka-01103"
 include("sdks:java:managed")
 findProject(":sdks:java:managed")?.name = "managed"
+include("sdks:java:io:iceberg")
+findProject(":sdks:java:io:iceberg")?.name = "iceberg"
+include("sdks:java:io:catalog")

Review Comment:
   It doesn't look like we add anything under "sdks:java:io:catalog".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Initial Iceberg Sink [beam]

2024-04-08 Thread via GitHub


chamikaramj commented on code in PR #30797:
URL: https://github.com/apache/beam/pull/30797#discussion_r1556356489


##
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/AppendFilesToTables.java:
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.iceberg;
+
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+
+class AppendFilesToTables
+extends PTransform, PCollection>> {
+
+  private final IcebergCatalogConfig catalogConfig;
+
+  AppendFilesToTables(IcebergCatalogConfig catalogConfig) {
+this.catalogConfig = catalogConfig;
+  }
+
+  @Override
+  public PCollection> expand(PCollection 
writtenFiles) {
+
+// Apply any sharded writes and flatten everything for catalog updates
+return writtenFiles
+.apply(
+"Key metadata updates by table",
+WithKeys.of(
+new SerializableFunction() {
+  @Override
+  public String apply(FileWriteResult input) {
+return input.getTableIdentifier().toString();
+  }
+}))
+// .setCoder(KvCoder.of(StringUtf8Coder.of(), new 
MetadataUpdate.MetadataUpdateCoder()))
+.apply("Group metadata updates by table", GroupByKey.create())
+.apply(
+"Append metadata updates to tables",
+ParDo.of(new AppendFilesToTablesDoFn(catalogConfig)))
+.setCoder(KvCoder.of(StringUtf8Coder.of(), 
SerializableCoder.of(Snapshot.class)));
+  }
+
+  private static class AppendFilesToTablesDoFn
+  extends DoFn>, KV> {
+
+private final IcebergCatalogConfig catalogConfig;
+
+private transient @MonotonicNonNull Catalog catalog;
+
+private AppendFilesToTablesDoFn(IcebergCatalogConfig catalogConfig) {
+  this.catalogConfig = catalogConfig;
+}
+
+private Catalog getCatalog() {
+  if (catalog == null) {
+catalog = catalogConfig.catalog();
+  }
+  return catalog;
+}
+
+@ProcessElement
+public void processElement(
+@Element KV> element,
+OutputReceiver> out,
+BoundedWindow window) {
+  Table table = 
getCatalog().loadTable(TableIdentifier.parse(element.getKey()));
+  AppendFiles update = table.newAppend();
+  for (FileWriteResult writtenFile : element.getValue()) {
+update.appendFile(writtenFile.getDataFile());
+  }
+  update.commit();

Review Comment:
   Yeah, (2) is fine. It's more about making sure that we don't double write if 
a work item fails. But if writing is idempotent it's simpler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Initial Iceberg Sink [beam]

2024-04-08 Thread via GitHub


kennknowles commented on code in PR #30797:
URL: https://github.com/apache/beam/pull/30797#discussion_r1552651757


##
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIO.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.iceberg;
+
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+public class IcebergIO {
+
+  public static WriteRows writeToDynamicDestinations(

Review Comment:
   We could do that. I was thinking that we might make convenience methods 
later like `writeToTable(catalog, table_id)` so I made this name extra long. We 
can always add others. I tend to prefer different method names rather than 
overloading.



##
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/AppendFilesToTables.java:
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.iceberg;
+
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+
+class AppendFilesToTables
+extends PTransform, PCollection>> {
+
+  private final IcebergCatalogConfig catalogConfig;
+
+  AppendFilesToTables(IcebergCatalogConfig catalogConfig) {
+this.catalogConfig = catalogConfig;
+  }
+
+  @Override
+  public PCollection> expand(PCollection 
writtenFiles) {
+
+// Apply any sharded writes and flatten everything for catalog updates
+return writtenFiles
+.apply(
+"Key metadata updates by table",
+WithKeys.of(
+new SerializableFunction() {
+  @Override
+  public String apply(FileWriteResult input) {
+return input.getTableIdentifier().toString();
+  }
+}))
+// .setCoder(KvCoder.of(StringUtf8Coder.of(), new 
MetadataUpdate.MetadataUpdateCoder()))

Review Comment:
   Done



##
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/FileWriteResult.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to 

Re: [PR] Initial Iceberg Sink [beam]

2024-04-04 Thread via GitHub


chamikaramj commented on code in PR #30797:
URL: https://github.com/apache/beam/pull/30797#discussion_r1552407817


##
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/AppendFilesToTables.java:
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.iceberg;
+
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+
+class AppendFilesToTables
+extends PTransform, PCollection>> {
+
+  private final IcebergCatalogConfig catalogConfig;
+
+  AppendFilesToTables(IcebergCatalogConfig catalogConfig) {
+this.catalogConfig = catalogConfig;
+  }
+
+  @Override
+  public PCollection> expand(PCollection 
writtenFiles) {
+
+// Apply any sharded writes and flatten everything for catalog updates
+return writtenFiles
+.apply(
+"Key metadata updates by table",
+WithKeys.of(
+new SerializableFunction() {
+  @Override
+  public String apply(FileWriteResult input) {
+return input.getTableIdentifier().toString();
+  }
+}))
+// .setCoder(KvCoder.of(StringUtf8Coder.of(), new 
MetadataUpdate.MetadataUpdateCoder()))

Review Comment:
   Uncomment or delete.



##
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/OneTableDynamicDestinations.java:
##
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.iceberg;
+
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+
+class OneTableDynamicDestinations implements DynamicDestinations {
+
+  private static final Schema EMPTY_SCHEMA = Schema.builder().build();
+  private static final Row EMPTY_ROW = Row.nullRow(EMPTY_SCHEMA);
+
+  // TableId represented as String for serializability
+  private final String tableIdString;
+
+  private transient @MonotonicNonNull TableIdentifier tableId;
+
+  private TableIdentifier getTableIdentifier() {
+if (tableId == null) {
+  tableId = TableIdentifier.parse(tableIdString);
+}
+return tableId;
+  }
+
+  OneTableDynamicDestinations(TableIdentifier tableId) {
+this.tableIdString = tableId.toString();
+  }
+
+  @Override
+  public Schema getMetadataSchema() {
+return EMPTY_SCHEMA;
+  }
+
+  @Override
+  public Row assignDestinationMetadata(Row data) {
+return EMPTY_ROW;
+  }
+
+  @Override
+  public IcebergDestination 

Re: [PR] Initial Iceberg Sink [beam]

2024-04-04 Thread via GitHub


kennknowles commented on PR #30797:
URL: https://github.com/apache/beam/pull/30797#issuecomment-2037849642

   > OK I have done a whole massive revision and tested it a little bit more.
   > 
   > The only piece that I have not revised is the `IcebergCatalogConfig` which 
gets turned into an `org.apache.iceberg.catalog.Catalog` on the client and each 
worker separately. I think your suggestion was to try to use just a big 
key-value map for all the config values. I am fine with that. I don't really 
know enough about it yet. All my deep dives into iceberg Java libraries was for 
other pieces.
   
   It looks like this might work: 
https://github.com/tabular-io/iceberg-kafka-connect/blob/5ab5c538efab9ccf3cde166f36ba34189eed7187/kafka-connect/src/main/java/io/tabular/iceberg/connect/IcebergSinkConfig.java#L256


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Initial Iceberg Sink [beam]

2024-04-04 Thread via GitHub


kennknowles commented on PR #30797:
URL: https://github.com/apache/beam/pull/30797#issuecomment-2037846183

   OK I have done a whole massive revision and tested it a little bit more.
   
   The only piece that I have not revised is the `IcebergCatalogConfig` which 
gets turned into an `org.apache.iceberg.catalog.Catalog` on the client and each 
worker separately. I think your suggestion was to try to use just a big 
key-value map for all the config values. I am fine with that. I don't really 
know enough about it yet. All my deep dives into iceberg Java libraries was for 
other pieces.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Initial Iceberg Sink [beam]

2024-04-04 Thread via GitHub


kennknowles commented on code in PR #30797:
URL: https://github.com/apache/beam/pull/30797#discussion_r1552179077


##
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/WriteToDestinations.java:
##
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.iceberg;
+
+import java.util.Collections;
+import java.util.UUID;
+import org.apache.beam.io.iceberg.WriteBundlesToFiles.Result;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.ShardedKeyCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.ShardedKey;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+
+class WriteToDestinations
+extends PTransform<
+PCollection>, 
IcebergWriteResult> {
+
+  @VisibleForTesting static final int DEFAULT_MAX_WRITERS_PER_BUNDLE = 20;
+  @VisibleForTesting static final int DEFAULT_MAX_FILES_PER_PARTITION = 10_000;
+  @VisibleForTesting static final long DEFAULT_MAX_BYTES_PER_PARTITION = 10L * 
(1L << 40); // 10TB
+  static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 40); // 1TB
+  static final int DEFAULT_NUM_FILE_SHARDS = 0;
+  static final int FILE_TRIGGERING_RECORD_COUNT = 50_000;
+
+  private final Coder destinationCoder;
+
+  private final RecordWriterFactory 
recordWriterFactory;
+  private final TableFactory tableFactory;
+
+  WriteToDestinations(
+  Coder destinationCoder,
+  RecordWriterFactory recordWriterFactory,
+  TableFactory tableFactory) {
+this.destinationCoder = destinationCoder;
+this.recordWriterFactory = recordWriterFactory;
+this.tableFactory = tableFactory;
+  }
+
+  private PCollectionView createJobIdPrefixView(Pipeline p) {
+
+final String jobName = p.getOptions().getJobName();
+
+return p.apply("JobIdCreationRoot_", Create.of((Void) null))
+.apply(
+"CreateJobId",
+ParDo.of(
+new DoFn() {
+  @ProcessElement
+  public void process(ProcessContext c) {
+c.output(jobName + "-" + UUID.randomUUID().toString());
+  }
+}))
+.apply("JobIdSideInput", View.asSingleton());
+  }
+
+  @Override
+  public IcebergWriteResult expand(
+  PCollection> input) {
+
+final PCollectionView fileView = 
createJobIdPrefixView(input.getPipeline());
+// We always do the equivalent of a dynamically sharded file creation
+TupleTag> writtenFilesTag = new 
TupleTag<>("writtenFiles");
+TupleTag, ElementT>> successfulWritesTag =
+new TupleTag<>("successfulWrites");
+TupleTag, ElementT>> failedWritesTag =
+new TupleTag<>("failedWrites");
+TupleTag> snapshotsTag = new 
TupleTag<>("snapshots");
+
+final Coder elementCoder =
+((KvCoder) 

Re: [PR] Initial Iceberg Sink [beam]

2024-04-04 Thread via GitHub


codecov-commenter commented on PR #30797:
URL: https://github.com/apache/beam/pull/30797#issuecomment-2037829820

   ## 
[Codecov](https://app.codecov.io/gh/apache/beam/pull/30797?dropdown=coverage=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 Report
   All modified and coverable lines are covered by tests :white_check_mark:
   > Project coverage is 0.00%. Comparing base 
[(`61eee6d`)](https://app.codecov.io/gh/apache/beam/commit/61eee6dd672800ed88bd0851a235e7b13ee10847?dropdown=coverage=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 to head 
[(`8faea3f`)](https://app.codecov.io/gh/apache/beam/pull/30797?dropdown=coverage=pr=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache).
   > Report is 1 commits behind head on master.
   
   > :exclamation: Current head 8faea3f differs from pull request most recent 
head 5af12aa. Consider uploading reports for the commit 5af12aa to get more 
accurate results
   
   
   Additional details and impacted files
   
   
   ```diff
   @@  Coverage Diff  @@
   ## master   #30797   +/-   ##
   =
   - Coverage 70.74%0   -70.75% 
   =
 Files  12560 -1256 
 Lines1407690   -140769 
 Branches   43070 -4307 
   =
   - Hits  995920-99592 
   + Misses377000-37700 
   + Partials   34770 -3477 
   ```
   
   | 
[Flag](https://app.codecov.io/gh/apache/beam/pull/30797/flags?src=pr=flags_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | Coverage Δ | |
   |---|---|---|
   | 
[go](https://app.codecov.io/gh/apache/beam/pull/30797/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | `?` | |
   | 
[java](https://app.codecov.io/gh/apache/beam/pull/30797/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | `?` | |
   | 
[python](https://app.codecov.io/gh/apache/beam/pull/30797/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click 
here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#carryforward-flags-in-the-pull-request-comment)
 to find out more.
   
   
   
   
   [:umbrella: View full report in Codecov by 
Sentry](https://app.codecov.io/gh/apache/beam/pull/30797?dropdown=coverage=pr=continue_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache).
   
   :loudspeaker: Have feedback on the report? [Share it 
here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Initial Iceberg Sink [beam]

2024-04-01 Thread via GitHub


kennknowles commented on code in PR #30797:
URL: https://github.com/apache/beam/pull/30797#discussion_r1547049828


##
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIO.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.iceberg;
+
+import java.util.Arrays;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableBiFunction;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+public class IcebergIO {
+
+  public static  Write 
writeToDestinations(
+  IcebergCatalog catalog,
+  DynamicDestinations dynamicDestinations,
+  SerializableBiFunction toRecord) {
+return new Write<>(catalog, dynamicDestinations, toRecord);
+  }
+
+  public static TableFactory forCatalog(final IcebergCatalog catalog) {

Review Comment:
   TBD. Leaving all "catalog" questions unresolved for this revision.



##
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/WriteToDestinations.java:
##
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.iceberg;
+
+import java.util.Collections;
+import java.util.UUID;
+import org.apache.beam.io.iceberg.WriteBundlesToFiles.Result;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.ShardedKeyCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.ShardedKey;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+
+class WriteToDestinations
+extends PTransform<
+PCollection>, 
IcebergWriteResult> {
+
+  @VisibleForTesting static final int DEFAULT_MAX_WRITERS_PER_BUNDLE = 20;
+  @VisibleForTesting static final int DEFAULT_MAX_FILES_PER_PARTITION = 10_000;
+  

Re: [PR] Initial Iceberg Sink [beam]

2024-04-01 Thread via GitHub


codecov[bot] commented on PR #30797:
URL: https://github.com/apache/beam/pull/30797#issuecomment-2030943008

   ## 
[Codecov](https://app.codecov.io/gh/apache/beam/pull/30797?dropdown=coverage=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 Report
   All modified and coverable lines are covered by tests :white_check_mark:
   > Project coverage is 71.47%. Comparing base 
[(`069c045`)](https://app.codecov.io/gh/apache/beam/commit/069c0459c2c17c5ef7c393d0ce77182208614f8d?dropdown=coverage=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 to head 
[(`a06a187`)](https://app.codecov.io/gh/apache/beam/pull/30797?dropdown=coverage=pr=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache).
   > Report is 19 commits behind head on master.
   
   > :exclamation: Current head a06a187 differs from pull request most recent 
head 2cffca8. Consider uploading reports for the commit 2cffca8 to get more 
accurate results
   
   
   Additional details and impacted files
   
   
   ```diff
   @@   Coverage Diff   @@
   ##   master   #30797   +/-   ##
   ===
 Coverage   71.47%   71.47%   
   ===
 Files 710  710   
 Lines  104815   104815   
   ===
 Hits7491574915   
 Misses  2826828268   
 Partials 1632 1632   
   ```
   
   
   
   
   
   [:umbrella: View full report in Codecov by 
Sentry](https://app.codecov.io/gh/apache/beam/pull/30797?dropdown=coverage=pr=continue_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache).
   
   :loudspeaker: Have feedback on the report? [Share it 
here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Initial Iceberg Sink [beam]

2024-03-29 Thread via GitHub


chamikaramj commented on code in PR #30797:
URL: https://github.com/apache/beam/pull/30797#discussion_r1544853193


##
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/DynamicDestinations.java:
##
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.iceberg;
+
+import static 
org.apache.beam.sdk.values.TypeDescriptors.extractFromTypeParameters;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public abstract class DynamicDestinations implements 
Serializable {

Review Comment:
   Seems like this has a lot of copied over logic from BQ dynamic destinations 
which probably we can simplify/change if we went with the new DLQ framework.
   
   
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java



##
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/WriteToDestinations.java:
##
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.iceberg;
+
+import java.util.Collections;
+import java.util.UUID;
+import org.apache.beam.io.iceberg.WriteBundlesToFiles.Result;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.ShardedKeyCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.ShardedKey;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 

Re: [PR] Initial Iceberg Sink [beam]

2024-03-29 Thread via GitHub


chamikaramj commented on code in PR #30797:
URL: https://github.com/apache/beam/pull/30797#discussion_r1544706525


##
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIO.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.iceberg;
+
+import java.util.Arrays;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableBiFunction;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+public class IcebergIO {
+
+  public static  Write 
writeToDestinations(
+  IcebergCatalog catalog,
+  DynamicDestinations dynamicDestinations,

Review Comment:
   I'm wondering if we can strip dynamic destinations based on UDFs out and 
think about how to introduce dynamic destinations to this I/O in a portable way 
based on https://s.apache.org/portable-dynamic-destinations



##
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/WriteToDestinations.java:
##
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.iceberg;
+
+import java.util.Collections;
+import java.util.UUID;
+import org.apache.beam.io.iceberg.WriteBundlesToFiles.Result;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.ShardedKeyCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.ShardedKey;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+
+class WriteToDestinations
+extends PTransform<
+PCollection>, 
IcebergWriteResult> {
+
+  @VisibleForTesting static final int DEFAULT_MAX_WRITERS_PER_BUNDLE = 20;

Review Comment:
   Any idea how we got to these defaults ? (if so we should document)



##

Re: [PR] Initial Iceberg Sink [beam]

2024-03-28 Thread via GitHub


kennknowles commented on code in PR #30797:
URL: https://github.com/apache/beam/pull/30797#discussion_r1543839400


##
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy:
##
@@ -1151,7 +1151,7 @@ class BeamModulePlugin implements Plugin {
 options.compilerArgs += ([
   '-parameters',
   '-Xlint:all',
-  '-Werror'
+//  '-Werror'

Review Comment:
   lol I missed this one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Initial Iceberg Sink [beam]

2024-03-28 Thread via GitHub


github-actions[bot] commented on PR #30797:
URL: https://github.com/apache/beam/pull/30797#issuecomment-2026314950

   Stopping reviewer notifications for this pull request: review requested by 
someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Initial Iceberg Sink [beam]

2024-03-28 Thread via GitHub


kennknowles commented on PR #30797:
URL: https://github.com/apache/beam/pull/30797#issuecomment-2026313742

   R: @chamikaramj 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org