This is an automated email from the ASF dual-hosted git repository. yhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 3364271e2c9 Pass runtime configs & variables to BeamSqlSeekableTable (#28253) 3364271e2c9 is described below commit 3364271e2c95be267e4e106415a010d46ea35f0b Author: gabry.wu <gabr...@apache.org> AuthorDate: Tue Sep 5 11:17:32 2023 +0800 Pass runtime configs & variables to BeamSqlSeekableTable (#28253) * closing https://github.com/apache/beam/issues/28145 * remove unsupported parameter & add parameter type to context * remove unnecessary semicolon --- .../beam/sdk/extensions/sql/BeamSqlSeekableTable.java | 12 ++++++++++-- .../extensions/sql/impl/transform/BeamJoinTransforms.java | 15 +++++++++++++++ 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java index 95f4b7f47f1..7b924cf6b6d 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java @@ -19,6 +19,8 @@ package org.apache.beam.sdk.extensions.sql; import java.io.Serializable; import java.util.List; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.Row; /** @@ -27,11 +29,17 @@ import org.apache.beam.sdk.values.Row; */ public interface BeamSqlSeekableTable extends Serializable { /** prepare the instance. */ - default void setUp() {}; + default void setUp() {} + + default void startBundle( + DoFn<Row, Row>.StartBundleContext context, PipelineOptions pipelineOptions) {} + + default void finishBundle( + DoFn<Row, Row>.FinishBundleContext context, PipelineOptions pipelineOptions) {} /** return a list of {@code Row} with given key set. */ List<Row> seekRow(Row lookupSubRow); /** cleanup resources of the instance. */ - default void tearDown() {}; + default void tearDown() {} } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java index a30822de151..e4d62c2b5de 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlSeekableTable; import org.apache.beam.sdk.extensions.sql.impl.utils.SerializableRexFieldAccess; import org.apache.beam.sdk.extensions.sql.impl.utils.SerializableRexInputRef; import org.apache.beam.sdk.extensions.sql.impl.utils.SerializableRexNode; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.FieldAccessDescriptor; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.DoFn; @@ -155,6 +156,20 @@ public class BeamJoinTransforms { seekableTable.setUp(); } + @StartBundle + public void startBundle( + DoFn<Row, Row>.StartBundleContext context, + PipelineOptions pipelineOptions) { + seekableTable.startBundle(context, pipelineOptions); + } + + @FinishBundle + public void finishBundle( + DoFn<Row, Row>.FinishBundleContext context, + PipelineOptions pipelineOptions) { + seekableTable.finishBundle(context, pipelineOptions); + } + @ProcessElement public void processElement(ProcessContext context) { Row factRow = context.element();