This is due to fusion.

Semantic description of the pipeline is:

 - The ParDo(CreateQueryFn) outputs each query as a separate element.
 - The ParDo(ProcessQueryFn) processes the elements and it is up to the
FlinkRunner how it wants to provide them, and also the Python SDK harness.

In practice what happens is:

 - ParDo(CreateQueryFn) + ParDo(ProcessQueryFn) are fused into a single
Python ExecutableStage
 - The (not Python) FlinkRunner executes them together by sending the
processing request to the Python SDK harness in its separate process. In
the Python SDK harness, there is no queue just direct calls between the
CreateQueryFn and the ProcessQueryFn, so there is no parallelism added
between steps in a fused stage.

The in-process Python DirectRunner does not benefit from fusion and I don't
think it does it (unless as a debug tool - I haven't read its code lately).
I actually would expect it to also not execute them in parallel. People who
know more about the Python SDK and direct runner may have better insight.

Kenn

On Wed, Oct 2, 2024 at 6:31 PM Jaehyeon Kim <dott...@gmail.com> wrote:

> Hello,
>
> I have a simple pipeline that prints a number of SQL query strings. A
> total of 9 query strings are generated by a DoFn called *GenerateQueryFn* and
> those are pushed into a splittable DoFn named *ProcessQueryFn*. The
> OffsetRange is used as a restriction, and the start and stop values are set
> to 0 and 1 respectively because it just needs to print a query string as a
> whole.
>
> import argparse
> import logging
> import typing
> import time
>
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.options.pipeline_options import SetupOptions
> from apache_beam import RestrictionProvider
> from apache_beam.io.restriction_trackers import OffsetRange,
> OffsetRestrictionTracker
>
>
> class GenerateQueryFn(beam.DoFn):
>     def process(self, element: typing.Any):
>         for query in self._generate_queries(None):
>             yield query
>
>     def _generate_queries(self, query_config):
>         if query_config is None:
>             logging.warning("To provide query config...")
>
>         return [
>             "SELECT * FROM (SELECT id, first_name, last_name, email FROM
> staging.users) AS subquery WHERE id >= 1 AND id < 1112",
>             "SELECT * FROM (SELECT id, first_name, last_name, email FROM
> staging.users) AS subquery WHERE id >= 1112 AND id < 2223",
>             "SELECT * FROM (SELECT id, first_name, last_name, email FROM
> staging.users) AS subquery WHERE id >= 2223 AND id < 3334",
>             "SELECT * FROM (SELECT id, first_name, last_name, email FROM
> staging.users) AS subquery WHERE id >= 3334 AND id < 4445",
>             "SELECT * FROM (SELECT id, first_name, last_name, email FROM
> staging.users) AS subquery WHERE id >= 4445 AND id < 5556",
>             "SELECT * FROM (SELECT id, first_name, last_name, email FROM
> staging.users) AS subquery WHERE id >= 5556 AND id < 6667",
>             "SELECT * FROM (SELECT id, first_name, last_name, email FROM
> staging.users) AS subquery WHERE id >= 6667 AND id < 7778",
>             "SELECT * FROM (SELECT id, first_name, last_name, email FROM
> staging.users) AS subquery WHERE id >= 7778 AND id < 8889",
>             "SELECT * FROM (SELECT id, first_name, last_name, email FROM
> staging.users) AS subquery WHERE id >= 8889 AND id <= 10000",
>         ]
>
>
> class ProcessQueryFn(beam.DoFn, RestrictionProvider):
>     def process(
>         self,
>         query: str,
>         tracker: OffsetRestrictionTracker = beam.DoFn.RestrictionParam(),
>     ):
>         restriction = tracker.current_restriction()
>         for current_position in range(restriction.start, restriction.stop
> + 1):
>             if tracker.try_claim(current_position):
>                 print(query)
>                 time.sleep(2)
>                 yield query
>             else:
>                 return
>
>     def create_tracker(self, restriction: OffsetRange) ->
> OffsetRestrictionTracker:
>         return OffsetRestrictionTracker(restriction)
>
>     def initial_restriction(self, element: str) -> OffsetRange:
>         return OffsetRange(start=0, stop=1)
>
>     def restriction_size(self, element: str, restriction: OffsetRange) ->
> int:
>         return restriction.size()
>
>
> def run(argv=None, save_main_session=True):
>     parser = argparse.ArgumentParser(description="Beam pipeline arguments"
> )
>     known_args, pipeline_args = parser.parse_known_args(argv)
>
>     # # We use the save_main_session option because one or more DoFn's in
> this
>     # # workflow rely on global context (e.g., a module imported at
> module level).
>     pipeline_options = PipelineOptions(pipeline_args)
>     pipeline_options.view_as(SetupOptions).save_main_session =
> save_main_session
>     print(f"known args - {known_args}")
>     print(f"pipeline options - {pipeline_options.display_data()}")
>
>     with beam.Pipeline(options=pipeline_options) as p:
>         (
>             p
>             | beam.Create([0])
>             | beam.ParDo(GenerateQueryFn())
>             | beam.ParDo(ProcessQueryFn())
>         )
>
>         logging.getLogger().setLevel(logging.WARN)
>         logging.info("Building pipeline ...")
>
>
> if __name__ == "__main__":
>     run()
>
> When I execute the pipeline with 3 workers on the Python direct runner, it
> looks like the elements are processed in parallel, which is expected - each
> of 3 query strings are processed at once.
>
> $ python pipeline.py --direct_num_workers=3
> --direct_running_mode=multi_threading
> known args - Namespace()
> pipeline options - {'direct_num_workers': 3, 'direct_running_mode':
> 'multi_threading', 'save_main_session': True}
> WARNING:root:To provide query config...
> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
> AS subquery WHERE id >= 1 AND id < 1112
> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
> AS subquery WHERE id >= 2223 AND id < 3334
> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
> AS subquery WHERE id >= 1112 AND id < 2223
> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
> AS subquery WHERE id >= 3334 AND id < 4445
> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
> AS subquery WHERE id >= 5556 AND id < 6667
> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
> AS subquery WHERE id >= 4445 AND id < 5556
> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
> AS subquery WHERE id >= 6667 AND id < 7778
> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
> AS subquery WHERE id >= 7778 AND id < 8889
> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
> AS subquery WHERE id >= 8889 AND id <= 10000
>
> When I execute it on the Flink runner where the parallelism is set to 3,
> the elements seem to be processed by a single worker. I use Apache Beam
> 2.57.0 and Flink 1.18.1.
>
> $ python pipeline.py --job_name=sql-query --runner FlinkRunner \
>   --flink_master=localhost:8081 --environment_type=LOOPBACK --parallelism=
> 3
> known args - Namespace()
> pipeline options - {'runner': 'FlinkRunner', 'job_name': 'sql-query',
> 'save_main_session': True, 'environment_type': 'LOOPBACK', 'flink_master':
> 'localhost:8081', 'parallelism': 3}
> WARNING:root:Waiting for grpc channel to be ready at localhost:47651.
> WARNING:root:Waiting for grpc channel to be ready at localhost:47651.
> WARNING:root:Waiting for grpc channel to be ready at localhost:47651.
> WARNING:root:Waiting for grpc channel to be ready at localhost:47651.
> WARNING:root:To provide query config...
> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
> AS subquery WHERE id >= 1 AND id < 1112
> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
> AS subquery WHERE id >= 1112 AND id < 2223
> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
> AS subquery WHERE id >= 2223 AND id < 3334
> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
> AS subquery WHERE id >= 3334 AND id < 4445
> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
> AS subquery WHERE id >= 4445 AND id < 5556
> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
> AS subquery WHERE id >= 5556 AND id < 6667
> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
> AS subquery WHERE id >= 6667 AND id < 7778
> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
> AS subquery WHERE id >= 7778 AND id < 8889
> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
> AS subquery WHERE id >= 8889 AND id <= 10000
>
> I can check that on the Flink UI as well. It shows only a single worker
> receives data while the others are idle.
>
> [image: image.png]
> Can you please inform me why the elements are not processed in parallel on
> the Flink runner?
>
> Cheers,
> Jaehyeon
>
>
>
>
>
>

Reply via email to