Streaming portable pipelines are not yet supported on the Python local runner.
On Wed, Mar 6, 2024 at 5:03 PM Jaehyeon Kim <dott...@gmail.com> wrote: > > Hello, > > I use the python SDK and my pipeline reads messages from Kafka and transforms > via SQL. I see two containers are created but it seems that they don't > communicate with each other so that the Flink task manager keeps killing the > containers. The Flink cluster runs locally. Is there a way to run two > multi-language pipelines (running on Docker) communicating with each other? > > Cheers, > Jaehyeon > > > > def run(): > parser = argparse.ArgumentParser( > description="Process statistics by user from website visit event" > ) > parser.add_argument( > "--inputs", > default="inputs", > help="Specify folder name that event records are saved", > ) > parser.add_argument( > "--runner", default="FlinkRunner", help="Specify Apache Beam Runner" > ) > opts = parser.parse_args() > > options = PipelineOptions() > pipeline_opts = { > "runner": opts.runner, > "flink_master": "localhost:8081", > "job_name": "traffic-agg-sql", > "environment_type": "LOOPBACK", > "streaming": True, > "parallelism": 3, > "experiments": [ > "use_deprecated_read" > ], ## https://github.com/apache/beam/issues/20979 > "checkpointing_interval": "60000", > } > options = PipelineOptions([], **pipeline_opts) > # Required, else it will complain that when importing worker functions > options.view_as(SetupOptions).save_main_session = True > > query = """ > WITH cte AS ( > SELECT id, CAST(event_datetime AS TIMESTAMP) AS ts > FROM PCOLLECTION > ) > SELECT > CAST(TUMBLE_START(ts, INTERVAL '10' SECOND) AS VARCHAR) AS > window_start, > CAST(TUMBLE_END(ts, INTERVAL '10' SECOND) AS VARCHAR) AS window_end, > COUNT(*) AS page_view > FROM cte > GROUP BY > TUMBLE(ts, INTERVAL '10' SECOND), id > """ > > p = beam.Pipeline(options=options) > ( > p > | "Read from Kafka" > >> kafka.ReadFromKafka( > consumer_config={ > "bootstrap.servers": os.getenv( > "BOOTSTRAP_SERVERS", > "host.docker.internal:29092", > ), > "auto.offset.reset": "earliest", > # "enable.auto.commit": "true", > "group.id": "traffic-agg", > }, > topics=["website-visit"], > ) > | "Decode messages" >> beam.Map(decode_message) > | "Parse elements" >> beam.Map(parse_json).with_output_types(EventLog) > | "Format timestamp" >> > beam.Map(format_timestamp).with_output_types(EventLog) > | "Count per minute" >> SqlTransform(query) > | beam.Map(print) > ) > > logging.getLogger().setLevel(logging.INFO) > logging.info("Building pipeline ...") > > p.run().wait_until_finish() > > Here is the error message from the flink UI. > > 2024-03-07 12:01:41 > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException: > java.lang.IllegalStateException: No container running for id > cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8 > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086) > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012) > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035) > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013) > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:5020) > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:458) > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:443) > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:310) > at > org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38) > at > org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:207) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:258) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: java.lang.IllegalStateException: No container running for id > cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8 > at > org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:137) > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:259) > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:232) > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571) > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313) > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190) > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080) > ... 20 more > Suppressed: java.io.IOException: Received exit code 1 for command 'docker > kill cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8'. > stderr: Error response from daemon: Cannot kill container: > cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8: Container > cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8 is not > running > at > org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:255) > at > org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:181) > at > org.apache.beam.runners.fnexecution.environment.DockerCommand.killContainer(DockerCommand.java:161) > at > org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:161) > >