Thanks. It works after specifying the output type. On Mon, 18 Mar 2024 at 01:44, XQ Hu via user <[email protected]> wrote:
> Here is what I did including how I setup the portable runner with Flink > > 1. Start the local Flink cluster > 2. Start the Flink job server and point to that local cluster: docker run > --net=host apache/beam_flink1.16_job_server:latest > --flink-master=localhost:8081 > 3. I use these pipeline options in the code: options = > PipelineOptions(parallelism=1, environment_type="LOOPBACK", job_endpoint= > "localhost:8099", streaming=True) > 4. The key I think is to explicitly specify the output types for > TestStream like this: TestStream(coder=coders.StrUtf8Coder()) > *.with_output_types(str)* > > These at least work for me. > > On Thu, Mar 14, 2024 at 4:37 PM Jaehyeon Kim <[email protected]> wrote: > >> Hello, >> >> I am trying a simple word count pipeline in a streaming environment using >> TestStream (Python SDK). While it works with the DirectRunner, it fails on >> the FlinkRunner with the following error. It looks like a type casting >> issue. >> >> Traceback (most recent call last): >> File >> "/home/jaehyeon/projects/general-demos/beam-pipelines/chapter1/first_streaming_pipeline.py", >> line 78, in <module> >> run() >> File >> "/home/jaehyeon/projects/general-demos/beam-pipelines/chapter1/first_streaming_pipeline.py", >> line 74, in run >> p.run().wait_until_finish() >> File >> "/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py", >> line 576, in wait_until_finish >> raise self._runtime_exception >> RuntimeError: Pipeline >> BeamApp-jaehyeon-0314203421-dfc96365_ba750d30-ff27-439d-a6ad-ce835f88fdf7 >> failed in state FAILED: java.lang.ClassCastException: class >> java.lang.String cannot be cast to class [B (java.lang.String and [B are in >> module java.base of loader 'bootstrap') >> >> Can you please inform me how to fix it? Below shows the pipeline code. >> >> Cheers, >> Jaehyeon >> >> import os >> import datetime >> import argparse >> import logging >> import re >> >> import apache_beam as beam >> from apache_beam.coders import coders >> from apache_beam.transforms import window >> from apache_beam.transforms.trigger import AfterWatermark, >> AccumulationMode >> from apache_beam.testing.test_stream import TestStream >> from apache_beam.transforms.window import TimestampedValue >> from apache_beam.options.pipeline_options import PipelineOptions >> from apache_beam.options.pipeline_options import StandardOptions >> >> >> def read_file(filename: str, inputpath: str): >> with open(os.path.join(inputpath, filename), "r") as f: >> return f.readlines() >> >> >> def tokenize(element: str): >> return re.findall(r"[A-Za-z\']+", element) >> >> >> def run(): >> parser = argparse.ArgumentParser(description="Beam pipeline >> arguments") >> parser.add_argument( >> "--inputs", >> default="inputs", >> help="Specify folder name that event records are saved", >> ) >> parser.add_argument( >> "--runner", default="DirectRunner", help="Specify Apache Beam >> Runner" >> ) >> opts = parser.parse_args() >> # PARENT_DIR = >> os.path.dirname(os.path.dirname(os.path.realpath(__file__))) >> >> options = PipelineOptions() >> options.view_as(StandardOptions).runner = opts.runner >> >> lines = [ >> "Lorem ipsum dolor sit amet, consectetuer adipiscing elit. >> Vestibulum erat nulla, ullamcorper nec, rutrum non, nonummy ac, erat. Nulla >> non lectus sed nisl molestie malesuada. Cras elementum. Integer in sapien. >> Mauris elementum mauris vitae tortor. Aliquam ante. Cum sociis natoque >> penatibus et magnis dis parturient montes, nascetur ridiculus mus. In enim >> a arcu imperdiet malesuada. Neque porro quisquam est, qui dolorem ipsum >> quia dolor sit amet, consectetur, adipisci velit, sed quia non numquam eius >> modi tempora incidunt ut labore et dolore magnam aliquam quaerat >> voluptatem. Aliquam erat volutpat. Excepteur sint occaecat cupidatat non >> proident, sunt in culpa qui officia deserunt mollit anim id est laborum. In >> enim a arcu imperdiet malesuada. Class aptent taciti sociosqu ad litora >> torquent per conubia nostra, per inceptos hymenaeos." >> "Duis pulvinar. Integer pellentesque quam vel velit. Sed >> convallis magna eu sem. Phasellus rhoncus. Aliquam erat volutpat. Quisque >> porta. Maecenas fermentum, sem in pharetra pellentesque, velit turpis >> volutpat ante, in pharetra metus odio a lectus. Fusce suscipit libero eget >> elit. Curabitur vitae diam non enim vestibulum interdum. Nam quis nulla. >> Etiam dui sem, fermentum vitae, sagittis id, malesuada in, quam. Aliquam >> ornare wisi eu metus. Aenean vel massa quis mauris vehicula lacinia. Nam >> libero tempore, cum soluta nobis est eligendi optio cumque nihil impedit >> quo minus id quod maxime placeat facere possimus, omnis voluptas assumenda >> est, omnis dolor repellendus." >> ] >> # lines = read_file("lorem.txt", os.path.join(PARENT_DIR, "inputs")) >> now = int(datetime.datetime.now().timestamp() * 1000) >> test_stream = ( >> TestStream(coder=coders.StrUtf8Coder()) >> .add_elements( >> [TimestampedValue(lines[i], now + 1000) for i in range(len( >> lines))] >> ) >> .advance_watermark_to_infinity() >> ) >> >> p = beam.Pipeline(options=options) >> ( >> p >> | "Read stream" >> test_stream >> | "Extract words" >> beam.FlatMap(tokenize) >> | "Windowing" >> >> beam.WindowInto( >> window.GlobalWindows(), >> trigger=AfterWatermark(), >> accumulation_mode=AccumulationMode.DISCARDING, >> ) >> | "Count per word" >> beam.combiners.Count.PerElement() >> | beam.Map(print) >> ) >> >> logging.getLogger().setLevel(logging.INFO) >> logging.info("Building pipeline ...") >> >> p.run().wait_until_finish() >> >> >> if __name__ == "__main__": >> run() >> >>
