Oh, sorry, I didn't see that. I would look earlier in the logs and see why it failed to bring up the containers (or, if they started, look in the container logs to see why they died).
On Wed, Mar 6, 2024 at 5:28 PM Jaehyeon Kim <dott...@gmail.com> wrote: > > I am not using the python local runner but the flink runner. A flink cluster > is started locally. > > On Thu, 7 Mar 2024 at 12:13, Robert Bradshaw via user <user@beam.apache.org> > wrote: >> >> Streaming portable pipelines are not yet supported on the Python local >> runner. >> >> On Wed, Mar 6, 2024 at 5:03 PM Jaehyeon Kim <dott...@gmail.com> wrote: >> > >> > Hello, >> > >> > I use the python SDK and my pipeline reads messages from Kafka and >> > transforms via SQL. I see two containers are created but it seems that >> > they don't communicate with each other so that the Flink task manager >> > keeps killing the containers. The Flink cluster runs locally. Is there a >> > way to run two multi-language pipelines (running on Docker) communicating >> > with each other? >> > >> > Cheers, >> > Jaehyeon >> > >> > >> > >> > def run(): >> > parser = argparse.ArgumentParser( >> > description="Process statistics by user from website visit event" >> > ) >> > parser.add_argument( >> > "--inputs", >> > default="inputs", >> > help="Specify folder name that event records are saved", >> > ) >> > parser.add_argument( >> > "--runner", default="FlinkRunner", help="Specify Apache Beam >> > Runner" >> > ) >> > opts = parser.parse_args() >> > >> > options = PipelineOptions() >> > pipeline_opts = { >> > "runner": opts.runner, >> > "flink_master": "localhost:8081", >> > "job_name": "traffic-agg-sql", >> > "environment_type": "LOOPBACK", >> > "streaming": True, >> > "parallelism": 3, >> > "experiments": [ >> > "use_deprecated_read" >> > ], ## https://github.com/apache/beam/issues/20979 >> > "checkpointing_interval": "60000", >> > } >> > options = PipelineOptions([], **pipeline_opts) >> > # Required, else it will complain that when importing worker functions >> > options.view_as(SetupOptions).save_main_session = True >> > >> > query = """ >> > WITH cte AS ( >> > SELECT id, CAST(event_datetime AS TIMESTAMP) AS ts >> > FROM PCOLLECTION >> > ) >> > SELECT >> > CAST(TUMBLE_START(ts, INTERVAL '10' SECOND) AS VARCHAR) AS >> > window_start, >> > CAST(TUMBLE_END(ts, INTERVAL '10' SECOND) AS VARCHAR) AS >> > window_end, >> > COUNT(*) AS page_view >> > FROM cte >> > GROUP BY >> > TUMBLE(ts, INTERVAL '10' SECOND), id >> > """ >> > >> > p = beam.Pipeline(options=options) >> > ( >> > p >> > | "Read from Kafka" >> > >> kafka.ReadFromKafka( >> > consumer_config={ >> > "bootstrap.servers": os.getenv( >> > "BOOTSTRAP_SERVERS", >> > "host.docker.internal:29092", >> > ), >> > "auto.offset.reset": "earliest", >> > # "enable.auto.commit": "true", >> > "group.id": "traffic-agg", >> > }, >> > topics=["website-visit"], >> > ) >> > | "Decode messages" >> beam.Map(decode_message) >> > | "Parse elements" >> >> > beam.Map(parse_json).with_output_types(EventLog) >> > | "Format timestamp" >> >> > beam.Map(format_timestamp).with_output_types(EventLog) >> > | "Count per minute" >> SqlTransform(query) >> > | beam.Map(print) >> > ) >> > >> > logging.getLogger().setLevel(logging.INFO) >> > logging.info("Building pipeline ...") >> > >> > p.run().wait_until_finish() >> > >> > Here is the error message from the flink UI. >> > >> > 2024-03-07 12:01:41 >> > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException: >> > java.lang.IllegalStateException: No container running for id >> > cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8 >> > at >> > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086) >> > at >> > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012) >> > at >> > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035) >> > at >> > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013) >> > at >> > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:5020) >> > at >> > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:458) >> > at >> > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:443) >> > at >> > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:310) >> > at >> > org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38) >> > at >> > org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:207) >> > at >> > org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:258) >> > at >> > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) >> > at >> > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731) >> > at >> > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) >> > at >> > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706) >> > at >> > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672) >> > at >> > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) >> > at >> > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) >> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) >> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) >> > at java.base/java.lang.Thread.run(Thread.java:829) >> > Caused by: java.lang.IllegalStateException: No container running for id >> > cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8 >> > at >> > org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:137) >> > at >> > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:259) >> > at >> > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:232) >> > at >> > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571) >> > at >> > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313) >> > at >> > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190) >> > at >> > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080) >> > ... 20 more >> > Suppressed: java.io.IOException: Received exit code 1 for command >> > 'docker kill >> > cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8'. stderr: >> > Error response from daemon: Cannot kill container: >> > cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8: >> > Container cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8 >> > is not running >> > at >> > org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:255) >> > at >> > org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:181) >> > at >> > org.apache.beam.runners.fnexecution.environment.DockerCommand.killContainer(DockerCommand.java:161) >> > at >> > org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:161) >> > >> >