Hello,

✔️ I have a simple pipeline that transforms data with *SqlTransform*. I use
the *FlinkRunner *and, when I don't specify the *flink_master *option and
use an embedded flink cluster, it works fine. However, if I use a local
flink cluster and specify the *flink_master *option to *localhost:8081*,
the expansion service running on Docker doesn't work. The flink cluster
gets started locally without using Docker (
*./setup/flink-1.16.3/bin/start-cluster.sh*).

✔️ The pipeline code can be found below and I added some troubleshooting
details below it.

import argparse
import logging
import typing

import apache_beam as beam
from apache_beam.transforms.sql import SqlTransform
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

class MyItem(typing.NamedTuple):
    id: int
    name: str
    value: float

beam.coders.registry.register_coder(MyItem, beam.coders.RowCoder)

def convert_to_item(row: list):
    cols = ["id", "name", "value"]
    return MyItem(**dict(zip(cols, row)))

def run():
    parser = argparse.ArgumentParser(
        description="Process statistics by user from website visit event"
    )
    parser.add_argument(
        "--runner", default="FlinkRunner", help="Specify Apache Beam Runner"
    )
    parser.add_argument(
        "--use_own",
        action="store_true",
        default="Flag to indicate whether to use an own local cluster",
    )
    opts = parser.parse_args()

    options = PipelineOptions()
    pipeline_opts = {
        "runner": opts.runner,
        "job_name": "sql-transform",
        "environment_type": "LOOPBACK",
    }
    if opts.use_own is True:
        pipeline_opts = {**pipeline_opts, **{"flink_master":
"localhost:8081"}}
    print(pipeline_opts)
    options = PipelineOptions([], **pipeline_opts)
    # Required, else it will complain that when importing worker functions
    options.view_as(SetupOptions).save_main_session = True

    query = """
    SELECT * FROM PCOLLECTION WHERE name = 'jack'
    """

    p = beam.Pipeline(options=options)
    (
        p
        | beam.Create([[1, "john", 123], [2, "jane", 234], [3, "jack", 345
]])
        | beam.Map(convert_to_item).with_output_types(MyItem)
        | SqlTransform(query)
        | beam.Map(print)
    )

    logging.getLogger().setLevel(logging.WARN)
    logging.info("Building pipeline ...")

    p.run().wait_until_finish()


if __name__ == "__main__":
    run()

✔️ When I check the expansion service docker container, normally it
downloads a JAR file and starts SDK Fn Harness. However it doesn't move
into the download step when I specify the *flink_master *to *localhost:8081*.
As the service container gets stuck, the flink task manager considers it is
lost and the container gets killed.

2024/03/12 03:49:23 Provision info:
pipeline_options:{fields:{key:"beam:option:allow_non_deterministic_key_coders:v1"
...
runner_capabilities:"beam:protocol:control_response_elements_embedding:v1"
*2024/03/12 03:49:24 Downloaded:
/tmp/1-2/staged/beam-sdks-java-extensions-sql-expansion-service-2.53.0--5mGwmjENLc1fPWWdDg_S2ASPB8WOYTnUARk_IhI-_A.jar
(sha256:fb9986c268c434b7357cf59674383f4b60123c1f163984e7500464fc8848fbf0,
size: 281440385)*
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/opt/apache/beam/jars/slf4j-jdk14.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/tmp/1-2/staged/beam-sdks-java-extensions-sql-expansion-service-2.53.0--5mGwmjENLc1fPWWdDg_S2ASPB8WOYTnUARk_IhI-_A.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.JDK14LoggerFactory]
SDK Fn Harness started
Harness ID 1-2
Logging location url: "localhost:42133"

Control location url: "localhost:36449"

Status location null
Pipeline Options File pipeline_options.json
Pipeline Options File pipeline_options.json exists. Overriding existing
options.
Pipeline options
{"beam:option:allow_non_deterministic_key_coders:v1":false,..."beam:option:verify_row_values:v1":false}

✔️ The difference with/without the *flink_master *option is quite minimal
in the pipeline options as shown below. However I'm not sure what makes it
fails to run the pipeline successfully.

without flink_master - fields:{key:"beam:option:flink_master:v1"
value:{string_value:"[auto]"}}
with flink_master      - fields:{key:"beam:option:flink_master:v1"
value:{string_value:"http://localhost:8081"}}

Can you please inform me how to fix the issue?

Cheers,
Jaehyeon

Reply via email to