Hello,
✔️ I have a simple pipeline that transforms data with *SqlTransform*. I use
the *FlinkRunner *and, when I don't specify the *flink_master *option and
use an embedded flink cluster, it works fine. However, if I use a local
flink cluster and specify the *flink_master *option to *localhost:8081*,
the expansion service running on Docker doesn't work. The flink cluster
gets started locally without using Docker (
*./setup/flink-1.16.3/bin/start-cluster.sh*).
✔️ The pipeline code can be found below and I added some troubleshooting
details below it.
import argparse
import logging
import typing
import apache_beam as beam
from apache_beam.transforms.sql import SqlTransform
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class MyItem(typing.NamedTuple):
id: int
name: str
value: float
beam.coders.registry.register_coder(MyItem, beam.coders.RowCoder)
def convert_to_item(row: list):
cols = ["id", "name", "value"]
return MyItem(**dict(zip(cols, row)))
def run():
parser = argparse.ArgumentParser(
description="Process statistics by user from website visit event"
)
parser.add_argument(
"--runner", default="FlinkRunner", help="Specify Apache Beam Runner"
)
parser.add_argument(
"--use_own",
action="store_true",
default="Flag to indicate whether to use an own local cluster",
)
opts = parser.parse_args()
options = PipelineOptions()
pipeline_opts = {
"runner": opts.runner,
"job_name": "sql-transform",
"environment_type": "LOOPBACK",
}
if opts.use_own is True:
pipeline_opts = {**pipeline_opts, **{"flink_master":
"localhost:8081"}}
print(pipeline_opts)
options = PipelineOptions([], **pipeline_opts)
# Required, else it will complain that when importing worker functions
options.view_as(SetupOptions).save_main_session = True
query = """
SELECT * FROM PCOLLECTION WHERE name = 'jack'
"""
p = beam.Pipeline(options=options)
(
p
| beam.Create([[1, "john", 123], [2, "jane", 234], [3, "jack", 345
]])
| beam.Map(convert_to_item).with_output_types(MyItem)
| SqlTransform(query)
| beam.Map(print)
)
logging.getLogger().setLevel(logging.WARN)
logging.info("Building pipeline ...")
p.run().wait_until_finish()
if __name__ == "__main__":
run()
✔️ When I check the expansion service docker container, normally it
downloads a JAR file and starts SDK Fn Harness. However it doesn't move
into the download step when I specify the *flink_master *to *localhost:8081*.
As the service container gets stuck, the flink task manager considers it is
lost and the container gets killed.
2024/03/12 03:49:23 Provision info:
pipeline_options:{fields:{key:"beam:option:allow_non_deterministic_key_coders:v1"
...
runner_capabilities:"beam:protocol:control_response_elements_embedding:v1"
*2024/03/12 03:49:24 Downloaded:
/tmp/1-2/staged/beam-sdks-java-extensions-sql-expansion-service-2.53.0--5mGwmjENLc1fPWWdDg_S2ASPB8WOYTnUARk_IhI-_A.jar
(sha256:fb9986c268c434b7357cf59674383f4b60123c1f163984e7500464fc8848fbf0,
size: 281440385)*
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/opt/apache/beam/jars/slf4j-jdk14.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/tmp/1-2/staged/beam-sdks-java-extensions-sql-expansion-service-2.53.0--5mGwmjENLc1fPWWdDg_S2ASPB8WOYTnUARk_IhI-_A.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.JDK14LoggerFactory]
SDK Fn Harness started
Harness ID 1-2
Logging location url: "localhost:42133"
Control location url: "localhost:36449"
Status location null
Pipeline Options File pipeline_options.json
Pipeline Options File pipeline_options.json exists. Overriding existing
options.
Pipeline options
{"beam:option:allow_non_deterministic_key_coders:v1":false,..."beam:option:verify_row_values:v1":false}
✔️ The difference with/without the *flink_master *option is quite minimal
in the pipeline options as shown below. However I'm not sure what makes it
fails to run the pipeline successfully.
without flink_master - fields:{key:"beam:option:flink_master:v1"
value:{string_value:"[auto]"}}
with flink_master - fields:{key:"beam:option:flink_master:v1"
value:{string_value:"http://localhost:8081"}}
Can you please inform me how to fix the issue?
Cheers,
Jaehyeon