This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0eefb797765 [YAML] Expose flatten implementation from Java. (#30057)
0eefb797765 is described below

commit 0eefb797765a8fdabb43443d934cf7bf3fbbcc69
Author: Robert Bradshaw <rober...@gmail.com>
AuthorDate: Tue Jan 23 23:16:43 2024 -0800

    [YAML] Expose flatten implementation from Java. (#30057)
    
    Technically Flatten will produce exactly the same result no matter where
    it is expanded, but simply having this in Java avoids complexities around
    dealing with affinity optimization.
---
 .../providers/FlattenTransformProvider.java        | 100 +++++++++++++++++++++
 .../apache_beam/yaml/standard_providers.yaml       |  26 ++++++
 2 files changed, 126 insertions(+)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/FlattenTransformProvider.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/FlattenTransformProvider.java
new file mode 100644
index 00000000000..baeccd1ac8c
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/FlattenTransformProvider.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms.providers;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for Flatten.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(SchemaTransformProvider.class)
+public class FlattenTransformProvider
+    extends 
TypedSchemaTransformProvider<FlattenTransformProvider.Configuration> {
+  protected static final String OUTPUT_ROWS_TAG = "output";
+
+  @Override
+  protected Class<Configuration> configurationClass() {
+    return Configuration.class;
+  }
+
+  @Override
+  protected SchemaTransform from(Configuration configuration) {
+    return new SchemaTransform() {
+      @Override
+      public PCollectionRowTuple expand(PCollectionRowTuple input) {
+        return PCollectionRowTuple.of(
+            OUTPUT_ROWS_TAG,
+            PCollectionList.of((Iterable<PCollection<Row>>) (Collection) 
input.expand().values())
+                .apply(Flatten.pCollections())
+                .setRowSchema(
+                    ((PCollection<Row>) 
input.expand().values().iterator().next()).getSchema()));
+      }
+    };
+  }
+
+  @Override
+  public String identifier() {
+    return "beam:schematransform:org.apache.beam:yaml:flatten:v1";
+  }
+
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_ROWS_TAG);
+  }
+
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class Configuration {
+
+    public static Builder builder() {
+      return new AutoValue_FlattenTransformProvider_Configuration.Builder();
+    }
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+
+      public abstract Configuration build();
+    }
+  }
+}
diff --git a/sdks/python/apache_beam/yaml/standard_providers.yaml 
b/sdks/python/apache_beam/yaml/standard_providers.yaml
index c4671fa93fc..89b0cc9d553 100644
--- a/sdks/python/apache_beam/yaml/standard_providers.yaml
+++ b/sdks/python/apache_beam/yaml/standard_providers.yaml
@@ -24,6 +24,32 @@
     version: BEAM_VERSION
   transforms:
      Sql: 'beam:external:java:sql:v1'
+     Flatten: 'beam:schematransform:org.apache.beam:yaml:flatten:v1'
+     LogForTesting: 
'beam:schematransform:org.apache.beam:yaml:log_for_testing:v1'
+
+# TODO(robertwb): Auto-detect redundantly provided transforms for maximal 
fusion.
+- type: 'beamJar'
+  config:
+    gradle_target: 'sdks:java:io:expansion-service:shadowJar'
+    version: BEAM_VERSION
+  transforms:
+     Flatten: 'beam:schematransform:org.apache.beam:yaml:flatten:v1'
+     LogForTesting: 
'beam:schematransform:org.apache.beam:yaml:log_for_testing:v1'
+
+- type: 'beamJar'
+  config:
+    gradle_target: 
'sdks:java:io:google-cloud-platform:expansion-service:shadowJar'
+    version: BEAM_VERSION
+  transforms:
+     Flatten: 'beam:schematransform:org.apache.beam:yaml:flatten:v1'
+     LogForTesting: 
'beam:schematransform:org.apache.beam:yaml:log_for_testing:v1'
+
+- type: 'beamJar'
+  config:
+    gradle_target: 'sdks:java:extensions:schemaio-expansion-service:shadowJar'
+    version: BEAM_VERSION
+  transforms:
+     Flatten: 'beam:schematransform:org.apache.beam:yaml:flatten:v1'
      LogForTesting: 
'beam:schematransform:org.apache.beam:yaml:log_for_testing:v1'
 
 - type: renaming

Reply via email to