Thanks. It works after specifying the output type.

On Mon, 18 Mar 2024 at 01:44, XQ Hu via user <[email protected]> wrote:

> Here is what I did including how I setup the portable runner with Flink
>
> 1. Start the local Flink cluster
> 2. Start the Flink job server and point to that local cluster: docker run
> --net=host apache/beam_flink1.16_job_server:latest
> --flink-master=localhost:8081
> 3. I use these pipeline options in the code: options =
> PipelineOptions(parallelism=1, environment_type="LOOPBACK", job_endpoint=
> "localhost:8099", streaming=True)
> 4. The key I think is to explicitly specify the output types for
> TestStream like this: TestStream(coder=coders.StrUtf8Coder())
> *.with_output_types(str)*
>
> These at least work for me.
>
> On Thu, Mar 14, 2024 at 4:37 PM Jaehyeon Kim <[email protected]> wrote:
>
>> Hello,
>>
>> I am trying a simple word count pipeline in a streaming environment using
>> TestStream (Python SDK). While it works with the DirectRunner, it fails on
>> the FlinkRunner with the following error. It looks like a type casting
>> issue.
>>
>> Traceback (most recent call last):
>>   File
>> "/home/jaehyeon/projects/general-demos/beam-pipelines/chapter1/first_streaming_pipeline.py",
>> line 78, in <module>
>>     run()
>>   File
>> "/home/jaehyeon/projects/general-demos/beam-pipelines/chapter1/first_streaming_pipeline.py",
>> line 74, in run
>>     p.run().wait_until_finish()
>>   File
>> "/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py",
>> line 576, in wait_until_finish
>>     raise self._runtime_exception
>> RuntimeError: Pipeline
>> BeamApp-jaehyeon-0314203421-dfc96365_ba750d30-ff27-439d-a6ad-ce835f88fdf7
>> failed in state FAILED: java.lang.ClassCastException: class
>> java.lang.String cannot be cast to class [B (java.lang.String and [B are in
>> module java.base of loader 'bootstrap')
>>
>> Can you please inform me how to fix it? Below shows the pipeline code.
>>
>> Cheers,
>> Jaehyeon
>>
>> import os
>> import datetime
>> import argparse
>> import logging
>> import re
>>
>> import apache_beam as beam
>> from apache_beam.coders import coders
>> from apache_beam.transforms import window
>> from apache_beam.transforms.trigger import AfterWatermark,
>> AccumulationMode
>> from apache_beam.testing.test_stream import TestStream
>> from apache_beam.transforms.window import TimestampedValue
>> from apache_beam.options.pipeline_options import PipelineOptions
>> from apache_beam.options.pipeline_options import StandardOptions
>>
>>
>> def read_file(filename: str, inputpath: str):
>>     with open(os.path.join(inputpath, filename), "r") as f:
>>         return f.readlines()
>>
>>
>> def tokenize(element: str):
>>     return re.findall(r"[A-Za-z\']+", element)
>>
>>
>> def run():
>>     parser = argparse.ArgumentParser(description="Beam pipeline
>> arguments")
>>     parser.add_argument(
>>         "--inputs",
>>         default="inputs",
>>         help="Specify folder name that event records are saved",
>>     )
>>     parser.add_argument(
>>         "--runner", default="DirectRunner", help="Specify Apache Beam
>> Runner"
>>     )
>>     opts = parser.parse_args()
>>     # PARENT_DIR =
>> os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
>>
>>     options = PipelineOptions()
>>     options.view_as(StandardOptions).runner = opts.runner
>>
>>     lines = [
>>         "Lorem ipsum dolor sit amet, consectetuer adipiscing elit.
>> Vestibulum erat nulla, ullamcorper nec, rutrum non, nonummy ac, erat. Nulla
>> non lectus sed nisl molestie malesuada. Cras elementum. Integer in sapien.
>> Mauris elementum mauris vitae tortor. Aliquam ante. Cum sociis natoque
>> penatibus et magnis dis parturient montes, nascetur ridiculus mus. In enim
>> a arcu imperdiet malesuada. Neque porro quisquam est, qui dolorem ipsum
>> quia dolor sit amet, consectetur, adipisci velit, sed quia non numquam eius
>> modi tempora incidunt ut labore et dolore magnam aliquam quaerat
>> voluptatem. Aliquam erat volutpat. Excepteur sint occaecat cupidatat non
>> proident, sunt in culpa qui officia deserunt mollit anim id est laborum. In
>> enim a arcu imperdiet malesuada. Class aptent taciti sociosqu ad litora
>> torquent per conubia nostra, per inceptos hymenaeos."
>>         "Duis pulvinar. Integer pellentesque quam vel velit. Sed
>> convallis magna eu sem. Phasellus rhoncus. Aliquam erat volutpat. Quisque
>> porta. Maecenas fermentum, sem in pharetra pellentesque, velit turpis
>> volutpat ante, in pharetra metus odio a lectus. Fusce suscipit libero eget
>> elit. Curabitur vitae diam non enim vestibulum interdum. Nam quis nulla.
>> Etiam dui sem, fermentum vitae, sagittis id, malesuada in, quam. Aliquam
>> ornare wisi eu metus. Aenean vel massa quis mauris vehicula lacinia. Nam
>> libero tempore, cum soluta nobis est eligendi optio cumque nihil impedit
>> quo minus id quod maxime placeat facere possimus, omnis voluptas assumenda
>> est, omnis dolor repellendus."
>>     ]
>>     # lines = read_file("lorem.txt", os.path.join(PARENT_DIR, "inputs"))
>>     now = int(datetime.datetime.now().timestamp() * 1000)
>>     test_stream = (
>>         TestStream(coder=coders.StrUtf8Coder())
>>         .add_elements(
>>             [TimestampedValue(lines[i], now + 1000) for i in range(len(
>> lines))]
>>         )
>>         .advance_watermark_to_infinity()
>>     )
>>
>>     p = beam.Pipeline(options=options)
>>     (
>>         p
>>         | "Read stream" >> test_stream
>>         | "Extract words" >> beam.FlatMap(tokenize)
>>         | "Windowing"
>>         >> beam.WindowInto(
>>             window.GlobalWindows(),
>>             trigger=AfterWatermark(),
>>             accumulation_mode=AccumulationMode.DISCARDING,
>>         )
>>         | "Count per word" >> beam.combiners.Count.PerElement()
>>         | beam.Map(print)
>>     )
>>
>>     logging.getLogger().setLevel(logging.INFO)
>>     logging.info("Building pipeline ...")
>>
>>     p.run().wait_until_finish()
>>
>>
>> if __name__ == "__main__":
>>     run()
>>
>>

Reply via email to