Here is what I did including how I setup the portable runner with Flink 1. Start the local Flink cluster 2. Start the Flink job server and point to that local cluster: docker run --net=host apache/beam_flink1.16_job_server:latest --flink-master=localhost:8081 3. I use these pipeline options in the code: options = PipelineOptions(parallelism=1, environment_type="LOOPBACK", job_endpoint= "localhost:8099", streaming=True) 4. The key I think is to explicitly specify the output types for TestStream like this: TestStream(coder=coders.StrUtf8Coder())*.with_output_types(str)*
These at least work for me. On Thu, Mar 14, 2024 at 4:37 PM Jaehyeon Kim <[email protected]> wrote: > Hello, > > I am trying a simple word count pipeline in a streaming environment using > TestStream (Python SDK). While it works with the DirectRunner, it fails on > the FlinkRunner with the following error. It looks like a type casting > issue. > > Traceback (most recent call last): > File > "/home/jaehyeon/projects/general-demos/beam-pipelines/chapter1/first_streaming_pipeline.py", > line 78, in <module> > run() > File > "/home/jaehyeon/projects/general-demos/beam-pipelines/chapter1/first_streaming_pipeline.py", > line 74, in run > p.run().wait_until_finish() > File > "/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py", > line 576, in wait_until_finish > raise self._runtime_exception > RuntimeError: Pipeline > BeamApp-jaehyeon-0314203421-dfc96365_ba750d30-ff27-439d-a6ad-ce835f88fdf7 > failed in state FAILED: java.lang.ClassCastException: class > java.lang.String cannot be cast to class [B (java.lang.String and [B are in > module java.base of loader 'bootstrap') > > Can you please inform me how to fix it? Below shows the pipeline code. > > Cheers, > Jaehyeon > > import os > import datetime > import argparse > import logging > import re > > import apache_beam as beam > from apache_beam.coders import coders > from apache_beam.transforms import window > from apache_beam.transforms.trigger import AfterWatermark, > AccumulationMode > from apache_beam.testing.test_stream import TestStream > from apache_beam.transforms.window import TimestampedValue > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.options.pipeline_options import StandardOptions > > > def read_file(filename: str, inputpath: str): > with open(os.path.join(inputpath, filename), "r") as f: > return f.readlines() > > > def tokenize(element: str): > return re.findall(r"[A-Za-z\']+", element) > > > def run(): > parser = argparse.ArgumentParser(description="Beam pipeline arguments" > ) > parser.add_argument( > "--inputs", > default="inputs", > help="Specify folder name that event records are saved", > ) > parser.add_argument( > "--runner", default="DirectRunner", help="Specify Apache Beam > Runner" > ) > opts = parser.parse_args() > # PARENT_DIR = > os.path.dirname(os.path.dirname(os.path.realpath(__file__))) > > options = PipelineOptions() > options.view_as(StandardOptions).runner = opts.runner > > lines = [ > "Lorem ipsum dolor sit amet, consectetuer adipiscing elit. > Vestibulum erat nulla, ullamcorper nec, rutrum non, nonummy ac, erat. Nulla > non lectus sed nisl molestie malesuada. Cras elementum. Integer in sapien. > Mauris elementum mauris vitae tortor. Aliquam ante. Cum sociis natoque > penatibus et magnis dis parturient montes, nascetur ridiculus mus. In enim > a arcu imperdiet malesuada. Neque porro quisquam est, qui dolorem ipsum > quia dolor sit amet, consectetur, adipisci velit, sed quia non numquam eius > modi tempora incidunt ut labore et dolore magnam aliquam quaerat > voluptatem. Aliquam erat volutpat. Excepteur sint occaecat cupidatat non > proident, sunt in culpa qui officia deserunt mollit anim id est laborum. In > enim a arcu imperdiet malesuada. Class aptent taciti sociosqu ad litora > torquent per conubia nostra, per inceptos hymenaeos." > "Duis pulvinar. Integer pellentesque quam vel velit. Sed > convallis magna eu sem. Phasellus rhoncus. Aliquam erat volutpat. Quisque > porta. Maecenas fermentum, sem in pharetra pellentesque, velit turpis > volutpat ante, in pharetra metus odio a lectus. Fusce suscipit libero eget > elit. Curabitur vitae diam non enim vestibulum interdum. Nam quis nulla. > Etiam dui sem, fermentum vitae, sagittis id, malesuada in, quam. Aliquam > ornare wisi eu metus. Aenean vel massa quis mauris vehicula lacinia. Nam > libero tempore, cum soluta nobis est eligendi optio cumque nihil impedit > quo minus id quod maxime placeat facere possimus, omnis voluptas assumenda > est, omnis dolor repellendus." > ] > # lines = read_file("lorem.txt", os.path.join(PARENT_DIR, "inputs")) > now = int(datetime.datetime.now().timestamp() * 1000) > test_stream = ( > TestStream(coder=coders.StrUtf8Coder()) > .add_elements( > [TimestampedValue(lines[i], now + 1000) for i in range(len( > lines))] > ) > .advance_watermark_to_infinity() > ) > > p = beam.Pipeline(options=options) > ( > p > | "Read stream" >> test_stream > | "Extract words" >> beam.FlatMap(tokenize) > | "Windowing" > >> beam.WindowInto( > window.GlobalWindows(), > trigger=AfterWatermark(), > accumulation_mode=AccumulationMode.DISCARDING, > ) > | "Count per word" >> beam.combiners.Count.PerElement() > | beam.Map(print) > ) > > logging.getLogger().setLevel(logging.INFO) > logging.info("Building pipeline ...") > > p.run().wait_until_finish() > > > if __name__ == "__main__": > run() > >
