Streaming portable pipelines are not yet supported on the Python local runner.

On Wed, Mar 6, 2024 at 5:03 PM Jaehyeon Kim <dott...@gmail.com> wrote:
>
> Hello,
>
> I use the python SDK and my pipeline reads messages from Kafka and transforms 
> via SQL. I see two containers are created but it seems that they don't 
> communicate with each other so that the Flink task manager keeps killing the 
> containers. The Flink cluster runs locally. Is there a way to run two 
> multi-language pipelines (running on Docker) communicating with each other?
>
> Cheers,
> Jaehyeon
>
>
>
> def run():
>     parser = argparse.ArgumentParser(
>         description="Process statistics by user from website visit event"
>     )
>     parser.add_argument(
>         "--inputs",
>         default="inputs",
>         help="Specify folder name that event records are saved",
>     )
>     parser.add_argument(
>         "--runner", default="FlinkRunner", help="Specify Apache Beam Runner"
>     )
>     opts = parser.parse_args()
>
>     options = PipelineOptions()
>     pipeline_opts = {
>         "runner": opts.runner,
>         "flink_master": "localhost:8081",
>         "job_name": "traffic-agg-sql",
>         "environment_type": "LOOPBACK",
>         "streaming": True,
>         "parallelism": 3,
>         "experiments": [
>             "use_deprecated_read"
>         ],  ## https://github.com/apache/beam/issues/20979
>         "checkpointing_interval": "60000",
>     }
>     options = PipelineOptions([], **pipeline_opts)
>     # Required, else it will complain that when importing worker functions
>     options.view_as(SetupOptions).save_main_session = True
>
>     query = """
>     WITH cte AS (
>         SELECT id, CAST(event_datetime AS TIMESTAMP) AS ts
>         FROM PCOLLECTION
>     )
>     SELECT
>         CAST(TUMBLE_START(ts, INTERVAL '10' SECOND) AS VARCHAR) AS 
> window_start,
>         CAST(TUMBLE_END(ts, INTERVAL '10' SECOND) AS VARCHAR) AS window_end,
>         COUNT(*) AS page_view
>     FROM cte
>     GROUP BY
>         TUMBLE(ts, INTERVAL '10' SECOND), id
>     """
>
>     p = beam.Pipeline(options=options)
>     (
>         p
>         | "Read from Kafka"
>         >> kafka.ReadFromKafka(
>             consumer_config={
>                 "bootstrap.servers": os.getenv(
>                     "BOOTSTRAP_SERVERS",
>                     "host.docker.internal:29092",
>                 ),
>                 "auto.offset.reset": "earliest",
>                 # "enable.auto.commit": "true",
>                 "group.id": "traffic-agg",
>             },
>             topics=["website-visit"],
>         )
>         | "Decode messages" >> beam.Map(decode_message)
>         | "Parse elements" >> beam.Map(parse_json).with_output_types(EventLog)
>         | "Format timestamp" >> 
> beam.Map(format_timestamp).with_output_types(EventLog)
>         | "Count per minute" >> SqlTransform(query)
>         | beam.Map(print)
>     )
>
>     logging.getLogger().setLevel(logging.INFO)
>     logging.info("Building pipeline ...")
>
>     p.run().wait_until_finish()
>
> Here is the error message from the flink UI.
>
> 2024-03-07 12:01:41
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
>  java.lang.IllegalStateException: No container running for id 
> cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8
>     at 
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086)
>     at 
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012)
>     at 
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035)
>     at 
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013)
>     at 
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:5020)
>     at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:458)
>     at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:443)
>     at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:310)
>     at 
> org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
>     at 
> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:207)
>     at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:258)
>     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>     at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.lang.IllegalStateException: No container running for id 
> cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8
>     at 
> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:137)
>     at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:259)
>     at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:232)
>     at 
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571)
>     at 
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313)
>     at 
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190)
>     at 
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080)
>     ... 20 more
>     Suppressed: java.io.IOException: Received exit code 1 for command 'docker 
> kill cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8'. 
> stderr: Error response from daemon: Cannot kill container: 
> cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8: Container 
> cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8 is not 
> running
>         at 
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:255)
>         at 
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:181)
>         at 
> org.apache.beam.runners.fnexecution.environment.DockerCommand.killContainer(DockerCommand.java:161)
>         at 
> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:161)
>
>

Reply via email to