Hello, Thank you for the details.
Adding ReShuffle() in between prevents fusion and spreads queries into multiple workers and that is what I expected. Cheers, Jaehyeon On Fri, 4 Oct 2024 at 06:57, Kenneth Knowles <k...@apache.org> wrote: > This is due to fusion. > > Semantic description of the pipeline is: > > - The ParDo(CreateQueryFn) outputs each query as a separate element. > - The ParDo(ProcessQueryFn) processes the elements and it is up to the > FlinkRunner how it wants to provide them, and also the Python SDK harness. > > In practice what happens is: > > - ParDo(CreateQueryFn) + ParDo(ProcessQueryFn) are fused into a single > Python ExecutableStage > - The (not Python) FlinkRunner executes them together by sending the > processing request to the Python SDK harness in its separate process. In > the Python SDK harness, there is no queue just direct calls between the > CreateQueryFn and the ProcessQueryFn, so there is no parallelism added > between steps in a fused stage. > > The in-process Python DirectRunner does not benefit from fusion and I > don't think it does it (unless as a debug tool - I haven't read its code > lately). I actually would expect it to also not execute them in parallel. > People who know more about the Python SDK and direct runner may have better > insight. > > Kenn > > On Wed, Oct 2, 2024 at 6:31 PM Jaehyeon Kim <dott...@gmail.com> wrote: > >> Hello, >> >> I have a simple pipeline that prints a number of SQL query strings. A >> total of 9 query strings are generated by a DoFn called *GenerateQueryFn* and >> those are pushed into a splittable DoFn named *ProcessQueryFn*. The >> OffsetRange is used as a restriction, and the start and stop values are set >> to 0 and 1 respectively because it just needs to print a query string as a >> whole. >> >> import argparse >> import logging >> import typing >> import time >> >> import apache_beam as beam >> from apache_beam.options.pipeline_options import PipelineOptions >> from apache_beam.options.pipeline_options import SetupOptions >> from apache_beam import RestrictionProvider >> from apache_beam.io.restriction_trackers import OffsetRange, >> OffsetRestrictionTracker >> >> >> class GenerateQueryFn(beam.DoFn): >> def process(self, element: typing.Any): >> for query in self._generate_queries(None): >> yield query >> >> def _generate_queries(self, query_config): >> if query_config is None: >> logging.warning("To provide query config...") >> >> return [ >> "SELECT * FROM (SELECT id, first_name, last_name, email FROM >> staging.users) AS subquery WHERE id >= 1 AND id < 1112", >> "SELECT * FROM (SELECT id, first_name, last_name, email FROM >> staging.users) AS subquery WHERE id >= 1112 AND id < 2223", >> "SELECT * FROM (SELECT id, first_name, last_name, email FROM >> staging.users) AS subquery WHERE id >= 2223 AND id < 3334", >> "SELECT * FROM (SELECT id, first_name, last_name, email FROM >> staging.users) AS subquery WHERE id >= 3334 AND id < 4445", >> "SELECT * FROM (SELECT id, first_name, last_name, email FROM >> staging.users) AS subquery WHERE id >= 4445 AND id < 5556", >> "SELECT * FROM (SELECT id, first_name, last_name, email FROM >> staging.users) AS subquery WHERE id >= 5556 AND id < 6667", >> "SELECT * FROM (SELECT id, first_name, last_name, email FROM >> staging.users) AS subquery WHERE id >= 6667 AND id < 7778", >> "SELECT * FROM (SELECT id, first_name, last_name, email FROM >> staging.users) AS subquery WHERE id >= 7778 AND id < 8889", >> "SELECT * FROM (SELECT id, first_name, last_name, email FROM >> staging.users) AS subquery WHERE id >= 8889 AND id <= 10000", >> ] >> >> >> class ProcessQueryFn(beam.DoFn, RestrictionProvider): >> def process( >> self, >> query: str, >> tracker: OffsetRestrictionTracker = beam.DoFn.RestrictionParam(), >> ): >> restriction = tracker.current_restriction() >> for current_position in range(restriction.start, restriction.stop >> + 1): >> if tracker.try_claim(current_position): >> print(query) >> time.sleep(2) >> yield query >> else: >> return >> >> def create_tracker(self, restriction: OffsetRange) -> >> OffsetRestrictionTracker: >> return OffsetRestrictionTracker(restriction) >> >> def initial_restriction(self, element: str) -> OffsetRange: >> return OffsetRange(start=0, stop=1) >> >> def restriction_size(self, element: str, restriction: OffsetRange) >> -> int: >> return restriction.size() >> >> >> def run(argv=None, save_main_session=True): >> parser = argparse.ArgumentParser(description="Beam pipeline >> arguments") >> known_args, pipeline_args = parser.parse_known_args(argv) >> >> # # We use the save_main_session option because one or more DoFn's >> in this >> # # workflow rely on global context (e.g., a module imported at >> module level). >> pipeline_options = PipelineOptions(pipeline_args) >> pipeline_options.view_as(SetupOptions).save_main_session = >> save_main_session >> print(f"known args - {known_args}") >> print(f"pipeline options - {pipeline_options.display_data()}") >> >> with beam.Pipeline(options=pipeline_options) as p: >> ( >> p >> | beam.Create([0]) >> | beam.ParDo(GenerateQueryFn()) >> | beam.ParDo(ProcessQueryFn()) >> ) >> >> logging.getLogger().setLevel(logging.WARN) >> logging.info("Building pipeline ...") >> >> >> if __name__ == "__main__": >> run() >> >> When I execute the pipeline with 3 workers on the Python direct runner, >> it looks like the elements are processed in parallel, which is expected - >> each of 3 query strings are processed at once. >> >> $ python pipeline.py --direct_num_workers=3 >> --direct_running_mode=multi_threading >> known args - Namespace() >> pipeline options - {'direct_num_workers': 3, 'direct_running_mode': >> 'multi_threading', 'save_main_session': True} >> WARNING:root:To provide query config... >> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users) >> AS subquery WHERE id >= 1 AND id < 1112 >> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users) >> AS subquery WHERE id >= 2223 AND id < 3334 >> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users) >> AS subquery WHERE id >= 1112 AND id < 2223 >> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users) >> AS subquery WHERE id >= 3334 AND id < 4445 >> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users) >> AS subquery WHERE id >= 5556 AND id < 6667 >> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users) >> AS subquery WHERE id >= 4445 AND id < 5556 >> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users) >> AS subquery WHERE id >= 6667 AND id < 7778 >> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users) >> AS subquery WHERE id >= 7778 AND id < 8889 >> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users) >> AS subquery WHERE id >= 8889 AND id <= 10000 >> >> When I execute it on the Flink runner where the parallelism is set to 3, >> the elements seem to be processed by a single worker. I use Apache Beam >> 2.57.0 and Flink 1.18.1. >> >> $ python pipeline.py --job_name=sql-query --runner FlinkRunner \ >> --flink_master=localhost:8081 --environment_type=LOOPBACK >> --parallelism=3 >> known args - Namespace() >> pipeline options - {'runner': 'FlinkRunner', 'job_name': 'sql-query', >> 'save_main_session': True, 'environment_type': 'LOOPBACK', 'flink_master': >> 'localhost:8081', 'parallelism': 3} >> WARNING:root:Waiting for grpc channel to be ready at localhost:47651. >> WARNING:root:Waiting for grpc channel to be ready at localhost:47651. >> WARNING:root:Waiting for grpc channel to be ready at localhost:47651. >> WARNING:root:Waiting for grpc channel to be ready at localhost:47651. >> WARNING:root:To provide query config... >> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users) >> AS subquery WHERE id >= 1 AND id < 1112 >> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users) >> AS subquery WHERE id >= 1112 AND id < 2223 >> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users) >> AS subquery WHERE id >= 2223 AND id < 3334 >> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users) >> AS subquery WHERE id >= 3334 AND id < 4445 >> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users) >> AS subquery WHERE id >= 4445 AND id < 5556 >> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users) >> AS subquery WHERE id >= 5556 AND id < 6667 >> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users) >> AS subquery WHERE id >= 6667 AND id < 7778 >> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users) >> AS subquery WHERE id >= 7778 AND id < 8889 >> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users) >> AS subquery WHERE id >= 8889 AND id <= 10000 >> >> I can check that on the Flink UI as well. It shows only a single worker >> receives data while the others are idle. >> >> [image: image.png] >> Can you please inform me why the elements are not processed in parallel >> on the Flink runner? >> >> Cheers, >> Jaehyeon >> >> >> >> >> >>