Hello Robert

Thanks for the explanation.

As I'm new to Beam, I don't understand the issue fully but it doesn't seem
to be a docker-in-docker issue because the Flink cluster runs locally, not
relying on Docker and the pipeline works if I don't specify the
*flink_master* option, which uses an embedded flink cluster - the Flink
cluster simply starts as *./setup/flink-1.16.3/bin/start-cluster.sh* .

I see tasks run in a Docker container with the host network, I adjusted the
Kafka bootstrap address accordingly as *host.docker.internal:29092*.
Therefore access to kafka is not an issue as well.

I also checked *SqlTransform *and *ReadFromKafka *rely on
*BeamJarExpansionService*() with its own Gradle target and I'm not sure
what you recommend is to launch my own expansion service.

Basically, at this moment, I'm not sure why it works if I don't
specify the *flink_master
*option but it fails with it.

Cheers,
Jaehyeon



On Sat, 9 Mar 2024 at 11:24, Robert Bradshaw via user <user@beam.apache.org>
wrote:

> The way that cross-language pipelines work is that each transform has
> an attached "environment" in which its workers should be instantiated.
> By default these are identified as docker images + a possible set of
> dependencies. Transforms with the same environment can be colocated.
>
> There is a tension between having large environments that can be
> shared and more targeted ones that have fewer dependencies.
>
> By default the environment for SQL relies on Beam's jar built as
> "sdks:java:extensions:sql:expansion-service:shadowJar" and the
> environment for Kafka relies on Beam's jar for
> "sdks:java:io:expansion-service:shadowJar" To use a single jar you
> could create a jar with both of these (plus whatever else you need)
> and use that (the easiest would be to pass the flag
>
> --beamServices='{
>   "sdks:java:extensions:sql:expansion-service:shadowJar":
> "/path/to/uber.jar",
>   "sdks:java:io:expansion-service:shadowJar": "/path/to/uber.jar"
> }'
>
> That being said, it should work just fine with distinct jars on an
> arbitrary setup as well (and the fact that it works locally hints to
> something being up with that setup). Not sure what your flink master
> configuration is like, but maybe it's a docker-in-docker issue?
>
> On Wed, Mar 6, 2024 at 8:30 PM Jaehyeon Kim <dott...@gmail.com> wrote:
> >
> > It may be a configuration issue. It works if I don't specify
> flink_master, which uses an embedded flink cluster.
> >
> > On Thu, 7 Mar 2024 at 12:47, Jaehyeon Kim <dott...@gmail.com> wrote:
> >>
> >> I works fine if I only use Kafka read/write as I only see a single
> container - two transforms (read and write) but a single container.
> >>
> >> If I add SqlTransform, however, another container is created and it
> begins to create an error. My speculation is the containers don't recognise
> each other and get killed by the Flink task manager. I see containers are
> kept created and killed.
> >>
> >> Does every multi-language pipeline runs in a separate container?
> >>
> >> On Thu, 7 Mar 2024, 12:35 pm Robert Bradshaw via user, <
> user@beam.apache.org> wrote:
> >>>
> >>> Oh, sorry, I didn't see that.
> >>>
> >>> I would look earlier in the logs and see why it failed to bring up the
> >>> containers (or, if they started, look in the container logs to see why
> >>> they died).
> >>>
> >>> On Wed, Mar 6, 2024 at 5:28 PM Jaehyeon Kim <dott...@gmail.com> wrote:
> >>> >
> >>> > I am not using the python local runner but the flink runner. A flink
> cluster is started locally.
> >>> >
> >>> > On Thu, 7 Mar 2024 at 12:13, Robert Bradshaw via user <
> user@beam.apache.org> wrote:
> >>> >>
> >>> >> Streaming portable pipelines are not yet supported on the Python
> local runner.
> >>> >>
> >>> >> On Wed, Mar 6, 2024 at 5:03 PM Jaehyeon Kim <dott...@gmail.com>
> wrote:
> >>> >> >
> >>> >> > Hello,
> >>> >> >
> >>> >> > I use the python SDK and my pipeline reads messages from Kafka
> and transforms via SQL. I see two containers are created but it seems that
> they don't communicate with each other so that the Flink task manager keeps
> killing the containers. The Flink cluster runs locally. Is there a way to
> run two multi-language pipelines (running on Docker) communicating with
> each other?
> >>> >> >
> >>> >> > Cheers,
> >>> >> > Jaehyeon
> >>> >> >
> >>> >> >
> >>> >> >
> >>> >> > def run():
> >>> >> >     parser = argparse.ArgumentParser(
> >>> >> >         description="Process statistics by user from website
> visit event"
> >>> >> >     )
> >>> >> >     parser.add_argument(
> >>> >> >         "--inputs",
> >>> >> >         default="inputs",
> >>> >> >         help="Specify folder name that event records are saved",
> >>> >> >     )
> >>> >> >     parser.add_argument(
> >>> >> >         "--runner", default="FlinkRunner", help="Specify Apache
> Beam Runner"
> >>> >> >     )
> >>> >> >     opts = parser.parse_args()
> >>> >> >
> >>> >> >     options = PipelineOptions()
> >>> >> >     pipeline_opts = {
> >>> >> >         "runner": opts.runner,
> >>> >> >         "flink_master": "localhost:8081",
> >>> >> >         "job_name": "traffic-agg-sql",
> >>> >> >         "environment_type": "LOOPBACK",
> >>> >> >         "streaming": True,
> >>> >> >         "parallelism": 3,
> >>> >> >         "experiments": [
> >>> >> >             "use_deprecated_read"
> >>> >> >         ],  ## https://github.com/apache/beam/issues/20979
> >>> >> >         "checkpointing_interval": "60000",
> >>> >> >     }
> >>> >> >     options = PipelineOptions([], **pipeline_opts)
> >>> >> >     # Required, else it will complain that when importing worker
> functions
> >>> >> >     options.view_as(SetupOptions).save_main_session = True
> >>> >> >
> >>> >> >     query = """
> >>> >> >     WITH cte AS (
> >>> >> >         SELECT id, CAST(event_datetime AS TIMESTAMP) AS ts
> >>> >> >         FROM PCOLLECTION
> >>> >> >     )
> >>> >> >     SELECT
> >>> >> >         CAST(TUMBLE_START(ts, INTERVAL '10' SECOND) AS VARCHAR)
> AS window_start,
> >>> >> >         CAST(TUMBLE_END(ts, INTERVAL '10' SECOND) AS VARCHAR) AS
> window_end,
> >>> >> >         COUNT(*) AS page_view
> >>> >> >     FROM cte
> >>> >> >     GROUP BY
> >>> >> >         TUMBLE(ts, INTERVAL '10' SECOND), id
> >>> >> >     """
> >>> >> >
> >>> >> >     p = beam.Pipeline(options=options)
> >>> >> >     (
> >>> >> >         p
> >>> >> >         | "Read from Kafka"
> >>> >> >         >> kafka.ReadFromKafka(
> >>> >> >             consumer_config={
> >>> >> >                 "bootstrap.servers": os.getenv(
> >>> >> >                     "BOOTSTRAP_SERVERS",
> >>> >> >                     "host.docker.internal:29092",
> >>> >> >                 ),
> >>> >> >                 "auto.offset.reset": "earliest",
> >>> >> >                 # "enable.auto.commit": "true",
> >>> >> >                 "group.id": "traffic-agg",
> >>> >> >             },
> >>> >> >             topics=["website-visit"],
> >>> >> >         )
> >>> >> >         | "Decode messages" >> beam.Map(decode_message)
> >>> >> >         | "Parse elements" >>
> beam.Map(parse_json).with_output_types(EventLog)
> >>> >> >         | "Format timestamp" >>
> beam.Map(format_timestamp).with_output_types(EventLog)
> >>> >> >         | "Count per minute" >> SqlTransform(query)
> >>> >> >         | beam.Map(print)
> >>> >> >     )
> >>> >> >
> >>> >> >     logging.getLogger().setLevel(logging.INFO)
> >>> >> >     logging.info("Building pipeline ...")
> >>> >> >
> >>> >> >     p.run().wait_until_finish()
> >>> >> >
> >>> >> > Here is the error message from the flink UI.
> >>> >> >
> >>> >> > 2024-03-07 12:01:41
> >>> >> >
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
> java.lang.IllegalStateException: No container running for id
> cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8
> >>> >> >     at
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086)
> >>> >> >     at
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012)
> >>> >> >     at
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035)
> >>> >> >     at
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013)
> >>> >> >     at
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:5020)
> >>> >> >     at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:458)
> >>> >> >     at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:443)
> >>> >> >     at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:310)
> >>> >> >     at
> org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
> >>> >> >     at
> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:207)
> >>> >> >     at
> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:258)
> >>> >> >     at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
> >>> >> >     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731)
> >>> >> >     at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> >>> >> >     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706)
> >>> >> >     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672)
> >>> >> >     at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> >>> >> >     at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
> >>> >> >     at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> >>> >> >     at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> >>> >> >     at java.base/java.lang.Thread.run(Thread.java:829)
> >>> >> > Caused by: java.lang.IllegalStateException: No container running
> for id cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8
> >>> >> >     at
> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:137)
> >>> >> >     at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:259)
> >>> >> >     at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:232)
> >>> >> >     at
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571)
> >>> >> >     at
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313)
> >>> >> >     at
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190)
> >>> >> >     at
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080)
> >>> >> >     ... 20 more
> >>> >> >     Suppressed: java.io.IOException: Received exit code 1 for
> command 'docker kill
> cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8'. stderr:
> Error response from daemon: Cannot kill container:
> cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8: Container
> cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8 is not
> running
> >>> >> >         at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:255)
> >>> >> >         at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:181)
> >>> >> >         at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.killContainer(DockerCommand.java:161)
> >>> >> >         at
> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:161)
> >>> >> >
> >>> >> >
>

Reply via email to