Hello Robert Thanks for the explanation.
As I'm new to Beam, I don't understand the issue fully but it doesn't seem to be a docker-in-docker issue because the Flink cluster runs locally, not relying on Docker and the pipeline works if I don't specify the *flink_master* option, which uses an embedded flink cluster - the Flink cluster simply starts as *./setup/flink-1.16.3/bin/start-cluster.sh* . I see tasks run in a Docker container with the host network, I adjusted the Kafka bootstrap address accordingly as *host.docker.internal:29092*. Therefore access to kafka is not an issue as well. I also checked *SqlTransform *and *ReadFromKafka *rely on *BeamJarExpansionService*() with its own Gradle target and I'm not sure what you recommend is to launch my own expansion service. Basically, at this moment, I'm not sure why it works if I don't specify the *flink_master *option but it fails with it. Cheers, Jaehyeon On Sat, 9 Mar 2024 at 11:24, Robert Bradshaw via user <user@beam.apache.org> wrote: > The way that cross-language pipelines work is that each transform has > an attached "environment" in which its workers should be instantiated. > By default these are identified as docker images + a possible set of > dependencies. Transforms with the same environment can be colocated. > > There is a tension between having large environments that can be > shared and more targeted ones that have fewer dependencies. > > By default the environment for SQL relies on Beam's jar built as > "sdks:java:extensions:sql:expansion-service:shadowJar" and the > environment for Kafka relies on Beam's jar for > "sdks:java:io:expansion-service:shadowJar" To use a single jar you > could create a jar with both of these (plus whatever else you need) > and use that (the easiest would be to pass the flag > > --beamServices='{ > "sdks:java:extensions:sql:expansion-service:shadowJar": > "/path/to/uber.jar", > "sdks:java:io:expansion-service:shadowJar": "/path/to/uber.jar" > }' > > That being said, it should work just fine with distinct jars on an > arbitrary setup as well (and the fact that it works locally hints to > something being up with that setup). Not sure what your flink master > configuration is like, but maybe it's a docker-in-docker issue? > > On Wed, Mar 6, 2024 at 8:30 PM Jaehyeon Kim <dott...@gmail.com> wrote: > > > > It may be a configuration issue. It works if I don't specify > flink_master, which uses an embedded flink cluster. > > > > On Thu, 7 Mar 2024 at 12:47, Jaehyeon Kim <dott...@gmail.com> wrote: > >> > >> I works fine if I only use Kafka read/write as I only see a single > container - two transforms (read and write) but a single container. > >> > >> If I add SqlTransform, however, another container is created and it > begins to create an error. My speculation is the containers don't recognise > each other and get killed by the Flink task manager. I see containers are > kept created and killed. > >> > >> Does every multi-language pipeline runs in a separate container? > >> > >> On Thu, 7 Mar 2024, 12:35 pm Robert Bradshaw via user, < > user@beam.apache.org> wrote: > >>> > >>> Oh, sorry, I didn't see that. > >>> > >>> I would look earlier in the logs and see why it failed to bring up the > >>> containers (or, if they started, look in the container logs to see why > >>> they died). > >>> > >>> On Wed, Mar 6, 2024 at 5:28 PM Jaehyeon Kim <dott...@gmail.com> wrote: > >>> > > >>> > I am not using the python local runner but the flink runner. A flink > cluster is started locally. > >>> > > >>> > On Thu, 7 Mar 2024 at 12:13, Robert Bradshaw via user < > user@beam.apache.org> wrote: > >>> >> > >>> >> Streaming portable pipelines are not yet supported on the Python > local runner. > >>> >> > >>> >> On Wed, Mar 6, 2024 at 5:03 PM Jaehyeon Kim <dott...@gmail.com> > wrote: > >>> >> > > >>> >> > Hello, > >>> >> > > >>> >> > I use the python SDK and my pipeline reads messages from Kafka > and transforms via SQL. I see two containers are created but it seems that > they don't communicate with each other so that the Flink task manager keeps > killing the containers. The Flink cluster runs locally. Is there a way to > run two multi-language pipelines (running on Docker) communicating with > each other? > >>> >> > > >>> >> > Cheers, > >>> >> > Jaehyeon > >>> >> > > >>> >> > > >>> >> > > >>> >> > def run(): > >>> >> > parser = argparse.ArgumentParser( > >>> >> > description="Process statistics by user from website > visit event" > >>> >> > ) > >>> >> > parser.add_argument( > >>> >> > "--inputs", > >>> >> > default="inputs", > >>> >> > help="Specify folder name that event records are saved", > >>> >> > ) > >>> >> > parser.add_argument( > >>> >> > "--runner", default="FlinkRunner", help="Specify Apache > Beam Runner" > >>> >> > ) > >>> >> > opts = parser.parse_args() > >>> >> > > >>> >> > options = PipelineOptions() > >>> >> > pipeline_opts = { > >>> >> > "runner": opts.runner, > >>> >> > "flink_master": "localhost:8081", > >>> >> > "job_name": "traffic-agg-sql", > >>> >> > "environment_type": "LOOPBACK", > >>> >> > "streaming": True, > >>> >> > "parallelism": 3, > >>> >> > "experiments": [ > >>> >> > "use_deprecated_read" > >>> >> > ], ## https://github.com/apache/beam/issues/20979 > >>> >> > "checkpointing_interval": "60000", > >>> >> > } > >>> >> > options = PipelineOptions([], **pipeline_opts) > >>> >> > # Required, else it will complain that when importing worker > functions > >>> >> > options.view_as(SetupOptions).save_main_session = True > >>> >> > > >>> >> > query = """ > >>> >> > WITH cte AS ( > >>> >> > SELECT id, CAST(event_datetime AS TIMESTAMP) AS ts > >>> >> > FROM PCOLLECTION > >>> >> > ) > >>> >> > SELECT > >>> >> > CAST(TUMBLE_START(ts, INTERVAL '10' SECOND) AS VARCHAR) > AS window_start, > >>> >> > CAST(TUMBLE_END(ts, INTERVAL '10' SECOND) AS VARCHAR) AS > window_end, > >>> >> > COUNT(*) AS page_view > >>> >> > FROM cte > >>> >> > GROUP BY > >>> >> > TUMBLE(ts, INTERVAL '10' SECOND), id > >>> >> > """ > >>> >> > > >>> >> > p = beam.Pipeline(options=options) > >>> >> > ( > >>> >> > p > >>> >> > | "Read from Kafka" > >>> >> > >> kafka.ReadFromKafka( > >>> >> > consumer_config={ > >>> >> > "bootstrap.servers": os.getenv( > >>> >> > "BOOTSTRAP_SERVERS", > >>> >> > "host.docker.internal:29092", > >>> >> > ), > >>> >> > "auto.offset.reset": "earliest", > >>> >> > # "enable.auto.commit": "true", > >>> >> > "group.id": "traffic-agg", > >>> >> > }, > >>> >> > topics=["website-visit"], > >>> >> > ) > >>> >> > | "Decode messages" >> beam.Map(decode_message) > >>> >> > | "Parse elements" >> > beam.Map(parse_json).with_output_types(EventLog) > >>> >> > | "Format timestamp" >> > beam.Map(format_timestamp).with_output_types(EventLog) > >>> >> > | "Count per minute" >> SqlTransform(query) > >>> >> > | beam.Map(print) > >>> >> > ) > >>> >> > > >>> >> > logging.getLogger().setLevel(logging.INFO) > >>> >> > logging.info("Building pipeline ...") > >>> >> > > >>> >> > p.run().wait_until_finish() > >>> >> > > >>> >> > Here is the error message from the flink UI. > >>> >> > > >>> >> > 2024-03-07 12:01:41 > >>> >> > > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException: > java.lang.IllegalStateException: No container running for id > cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8 > >>> >> > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086) > >>> >> > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012) > >>> >> > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035) > >>> >> > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013) > >>> >> > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:5020) > >>> >> > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:458) > >>> >> > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:443) > >>> >> > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:310) > >>> >> > at > org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38) > >>> >> > at > org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:207) > >>> >> > at > org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:258) > >>> >> > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) > >>> >> > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731) > >>> >> > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > >>> >> > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706) > >>> >> > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672) > >>> >> > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) > >>> >> > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) > >>> >> > at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) > >>> >> > at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > >>> >> > at java.base/java.lang.Thread.run(Thread.java:829) > >>> >> > Caused by: java.lang.IllegalStateException: No container running > for id cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8 > >>> >> > at > org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:137) > >>> >> > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:259) > >>> >> > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:232) > >>> >> > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571) > >>> >> > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313) > >>> >> > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190) > >>> >> > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080) > >>> >> > ... 20 more > >>> >> > Suppressed: java.io.IOException: Received exit code 1 for > command 'docker kill > cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8'. stderr: > Error response from daemon: Cannot kill container: > cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8: Container > cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8 is not > running > >>> >> > at > org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:255) > >>> >> > at > org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:181) > >>> >> > at > org.apache.beam.runners.fnexecution.environment.DockerCommand.killContainer(DockerCommand.java:161) > >>> >> > at > org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:161) > >>> >> > > >>> >> > >