Here is what I did including how I setup the portable runner with Flink

1. Start the local Flink cluster
2. Start the Flink job server and point to that local cluster: docker run
--net=host apache/beam_flink1.16_job_server:latest
--flink-master=localhost:8081
3. I use these pipeline options in the code: options =
PipelineOptions(parallelism=1, environment_type="LOOPBACK", job_endpoint=
"localhost:8099", streaming=True)
4. The key I think is to explicitly specify the output types for TestStream
like this: TestStream(coder=coders.StrUtf8Coder())*.with_output_types(str)*

These at least work for me.

On Thu, Mar 14, 2024 at 4:37 PM Jaehyeon Kim <[email protected]> wrote:

> Hello,
>
> I am trying a simple word count pipeline in a streaming environment using
> TestStream (Python SDK). While it works with the DirectRunner, it fails on
> the FlinkRunner with the following error. It looks like a type casting
> issue.
>
> Traceback (most recent call last):
>   File
> "/home/jaehyeon/projects/general-demos/beam-pipelines/chapter1/first_streaming_pipeline.py",
> line 78, in <module>
>     run()
>   File
> "/home/jaehyeon/projects/general-demos/beam-pipelines/chapter1/first_streaming_pipeline.py",
> line 74, in run
>     p.run().wait_until_finish()
>   File
> "/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py",
> line 576, in wait_until_finish
>     raise self._runtime_exception
> RuntimeError: Pipeline
> BeamApp-jaehyeon-0314203421-dfc96365_ba750d30-ff27-439d-a6ad-ce835f88fdf7
> failed in state FAILED: java.lang.ClassCastException: class
> java.lang.String cannot be cast to class [B (java.lang.String and [B are in
> module java.base of loader 'bootstrap')
>
> Can you please inform me how to fix it? Below shows the pipeline code.
>
> Cheers,
> Jaehyeon
>
> import os
> import datetime
> import argparse
> import logging
> import re
>
> import apache_beam as beam
> from apache_beam.coders import coders
> from apache_beam.transforms import window
> from apache_beam.transforms.trigger import AfterWatermark,
> AccumulationMode
> from apache_beam.testing.test_stream import TestStream
> from apache_beam.transforms.window import TimestampedValue
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.options.pipeline_options import StandardOptions
>
>
> def read_file(filename: str, inputpath: str):
>     with open(os.path.join(inputpath, filename), "r") as f:
>         return f.readlines()
>
>
> def tokenize(element: str):
>     return re.findall(r"[A-Za-z\']+", element)
>
>
> def run():
>     parser = argparse.ArgumentParser(description="Beam pipeline arguments"
> )
>     parser.add_argument(
>         "--inputs",
>         default="inputs",
>         help="Specify folder name that event records are saved",
>     )
>     parser.add_argument(
>         "--runner", default="DirectRunner", help="Specify Apache Beam
> Runner"
>     )
>     opts = parser.parse_args()
>     # PARENT_DIR =
> os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
>
>     options = PipelineOptions()
>     options.view_as(StandardOptions).runner = opts.runner
>
>     lines = [
>         "Lorem ipsum dolor sit amet, consectetuer adipiscing elit.
> Vestibulum erat nulla, ullamcorper nec, rutrum non, nonummy ac, erat. Nulla
> non lectus sed nisl molestie malesuada. Cras elementum. Integer in sapien.
> Mauris elementum mauris vitae tortor. Aliquam ante. Cum sociis natoque
> penatibus et magnis dis parturient montes, nascetur ridiculus mus. In enim
> a arcu imperdiet malesuada. Neque porro quisquam est, qui dolorem ipsum
> quia dolor sit amet, consectetur, adipisci velit, sed quia non numquam eius
> modi tempora incidunt ut labore et dolore magnam aliquam quaerat
> voluptatem. Aliquam erat volutpat. Excepteur sint occaecat cupidatat non
> proident, sunt in culpa qui officia deserunt mollit anim id est laborum. In
> enim a arcu imperdiet malesuada. Class aptent taciti sociosqu ad litora
> torquent per conubia nostra, per inceptos hymenaeos."
>         "Duis pulvinar. Integer pellentesque quam vel velit. Sed
> convallis magna eu sem. Phasellus rhoncus. Aliquam erat volutpat. Quisque
> porta. Maecenas fermentum, sem in pharetra pellentesque, velit turpis
> volutpat ante, in pharetra metus odio a lectus. Fusce suscipit libero eget
> elit. Curabitur vitae diam non enim vestibulum interdum. Nam quis nulla.
> Etiam dui sem, fermentum vitae, sagittis id, malesuada in, quam. Aliquam
> ornare wisi eu metus. Aenean vel massa quis mauris vehicula lacinia. Nam
> libero tempore, cum soluta nobis est eligendi optio cumque nihil impedit
> quo minus id quod maxime placeat facere possimus, omnis voluptas assumenda
> est, omnis dolor repellendus."
>     ]
>     # lines = read_file("lorem.txt", os.path.join(PARENT_DIR, "inputs"))
>     now = int(datetime.datetime.now().timestamp() * 1000)
>     test_stream = (
>         TestStream(coder=coders.StrUtf8Coder())
>         .add_elements(
>             [TimestampedValue(lines[i], now + 1000) for i in range(len(
> lines))]
>         )
>         .advance_watermark_to_infinity()
>     )
>
>     p = beam.Pipeline(options=options)
>     (
>         p
>         | "Read stream" >> test_stream
>         | "Extract words" >> beam.FlatMap(tokenize)
>         | "Windowing"
>         >> beam.WindowInto(
>             window.GlobalWindows(),
>             trigger=AfterWatermark(),
>             accumulation_mode=AccumulationMode.DISCARDING,
>         )
>         | "Count per word" >> beam.combiners.Count.PerElement()
>         | beam.Map(print)
>     )
>
>     logging.getLogger().setLevel(logging.INFO)
>     logging.info("Building pipeline ...")
>
>     p.run().wait_until_finish()
>
>
> if __name__ == "__main__":
>     run()
>
>

Reply via email to