This is an automated email from the ASF dual-hosted git repository. robertwb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 0eefb797765 [YAML] Expose flatten implementation from Java. (#30057) 0eefb797765 is described below commit 0eefb797765a8fdabb43443d934cf7bf3fbbcc69 Author: Robert Bradshaw <rober...@gmail.com> AuthorDate: Tue Jan 23 23:16:43 2024 -0800 [YAML] Expose flatten implementation from Java. (#30057) Technically Flatten will produce exactly the same result no matter where it is expanded, but simply having this in Java avoids complexities around dealing with affinity optimization. --- .../providers/FlattenTransformProvider.java | 100 +++++++++++++++++++++ .../apache_beam/yaml/standard_providers.yaml | 26 ++++++ 2 files changed, 126 insertions(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/FlattenTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/FlattenTransformProvider.java new file mode 100644 index 00000000000..baeccd1ac8c --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/FlattenTransformProvider.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.transforms.providers; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; + +/** + * An implementation of {@link TypedSchemaTransformProvider} for Flatten. + * + * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We + * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam + * repository. + */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +@AutoService(SchemaTransformProvider.class) +public class FlattenTransformProvider + extends TypedSchemaTransformProvider<FlattenTransformProvider.Configuration> { + protected static final String OUTPUT_ROWS_TAG = "output"; + + @Override + protected Class<Configuration> configurationClass() { + return Configuration.class; + } + + @Override + protected SchemaTransform from(Configuration configuration) { + return new SchemaTransform() { + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + return PCollectionRowTuple.of( + OUTPUT_ROWS_TAG, + PCollectionList.of((Iterable<PCollection<Row>>) (Collection) input.expand().values()) + .apply(Flatten.pCollections()) + .setRowSchema( + ((PCollection<Row>) input.expand().values().iterator().next()).getSchema())); + } + }; + } + + @Override + public String identifier() { + return "beam:schematransform:org.apache.beam:yaml:flatten:v1"; + } + + @Override + public List<String> inputCollectionNames() { + return Collections.emptyList(); + } + + @Override + public List<String> outputCollectionNames() { + return Collections.singletonList(OUTPUT_ROWS_TAG); + } + + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class Configuration { + + public static Builder builder() { + return new AutoValue_FlattenTransformProvider_Configuration.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + + public abstract Configuration build(); + } + } +} diff --git a/sdks/python/apache_beam/yaml/standard_providers.yaml b/sdks/python/apache_beam/yaml/standard_providers.yaml index c4671fa93fc..89b0cc9d553 100644 --- a/sdks/python/apache_beam/yaml/standard_providers.yaml +++ b/sdks/python/apache_beam/yaml/standard_providers.yaml @@ -24,6 +24,32 @@ version: BEAM_VERSION transforms: Sql: 'beam:external:java:sql:v1' + Flatten: 'beam:schematransform:org.apache.beam:yaml:flatten:v1' + LogForTesting: 'beam:schematransform:org.apache.beam:yaml:log_for_testing:v1' + +# TODO(robertwb): Auto-detect redundantly provided transforms for maximal fusion. +- type: 'beamJar' + config: + gradle_target: 'sdks:java:io:expansion-service:shadowJar' + version: BEAM_VERSION + transforms: + Flatten: 'beam:schematransform:org.apache.beam:yaml:flatten:v1' + LogForTesting: 'beam:schematransform:org.apache.beam:yaml:log_for_testing:v1' + +- type: 'beamJar' + config: + gradle_target: 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar' + version: BEAM_VERSION + transforms: + Flatten: 'beam:schematransform:org.apache.beam:yaml:flatten:v1' + LogForTesting: 'beam:schematransform:org.apache.beam:yaml:log_for_testing:v1' + +- type: 'beamJar' + config: + gradle_target: 'sdks:java:extensions:schemaio-expansion-service:shadowJar' + version: BEAM_VERSION + transforms: + Flatten: 'beam:schematransform:org.apache.beam:yaml:flatten:v1' LogForTesting: 'beam:schematransform:org.apache.beam:yaml:log_for_testing:v1' - type: renaming