This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3364271e2c9 Pass runtime configs & variables to BeamSqlSeekableTable 
(#28253)
3364271e2c9 is described below

commit 3364271e2c95be267e4e106415a010d46ea35f0b
Author: gabry.wu <gabr...@apache.org>
AuthorDate: Tue Sep 5 11:17:32 2023 +0800

    Pass runtime configs & variables to BeamSqlSeekableTable (#28253)
    
    * closing https://github.com/apache/beam/issues/28145
    
    * remove unsupported parameter & add parameter type to context
    
    * remove unnecessary semicolon
---
 .../beam/sdk/extensions/sql/BeamSqlSeekableTable.java     | 12 ++++++++++--
 .../extensions/sql/impl/transform/BeamJoinTransforms.java | 15 +++++++++++++++
 2 files changed, 25 insertions(+), 2 deletions(-)

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java
index 95f4b7f47f1..7b924cf6b6d 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java
@@ -19,6 +19,8 @@ package org.apache.beam.sdk.extensions.sql;
 
 import java.io.Serializable;
 import java.util.List;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.values.Row;
 
 /**
@@ -27,11 +29,17 @@ import org.apache.beam.sdk.values.Row;
  */
 public interface BeamSqlSeekableTable extends Serializable {
   /** prepare the instance. */
-  default void setUp() {};
+  default void setUp() {}
+
+  default void startBundle(
+      DoFn<Row, Row>.StartBundleContext context, PipelineOptions 
pipelineOptions) {}
+
+  default void finishBundle(
+      DoFn<Row, Row>.FinishBundleContext context, PipelineOptions 
pipelineOptions) {}
 
   /** return a list of {@code Row} with given key set. */
   List<Row> seekRow(Row lookupSubRow);
 
   /** cleanup resources of the instance. */
-  default void tearDown() {};
+  default void tearDown() {}
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
index a30822de151..e4d62c2b5de 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
@@ -26,6 +26,7 @@ import 
org.apache.beam.sdk.extensions.sql.BeamSqlSeekableTable;
 import 
org.apache.beam.sdk.extensions.sql.impl.utils.SerializableRexFieldAccess;
 import org.apache.beam.sdk.extensions.sql.impl.utils.SerializableRexInputRef;
 import org.apache.beam.sdk.extensions.sql.impl.utils.SerializableRexNode;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -155,6 +156,20 @@ public class BeamJoinTransforms {
                       seekableTable.setUp();
                     }
 
+                    @StartBundle
+                    public void startBundle(
+                        DoFn<Row, Row>.StartBundleContext context,
+                        PipelineOptions pipelineOptions) {
+                      seekableTable.startBundle(context, pipelineOptions);
+                    }
+
+                    @FinishBundle
+                    public void finishBundle(
+                        DoFn<Row, Row>.FinishBundleContext context,
+                        PipelineOptions pipelineOptions) {
+                      seekableTable.finishBundle(context, pipelineOptions);
+                    }
+
                     @ProcessElement
                     public void processElement(ProcessContext context) {
                       Row factRow = context.element();

Reply via email to