Hello,

Thank you for the details.

Adding ReShuffle() in between prevents fusion and spreads queries into
multiple workers and that is what I expected.

Cheers,
Jaehyeon

On Fri, 4 Oct 2024 at 06:57, Kenneth Knowles <k...@apache.org> wrote:

> This is due to fusion.
>
> Semantic description of the pipeline is:
>
>  - The ParDo(CreateQueryFn) outputs each query as a separate element.
>  - The ParDo(ProcessQueryFn) processes the elements and it is up to the
> FlinkRunner how it wants to provide them, and also the Python SDK harness.
>
> In practice what happens is:
>
>  - ParDo(CreateQueryFn) + ParDo(ProcessQueryFn) are fused into a single
> Python ExecutableStage
>  - The (not Python) FlinkRunner executes them together by sending the
> processing request to the Python SDK harness in its separate process. In
> the Python SDK harness, there is no queue just direct calls between the
> CreateQueryFn and the ProcessQueryFn, so there is no parallelism added
> between steps in a fused stage.
>
> The in-process Python DirectRunner does not benefit from fusion and I
> don't think it does it (unless as a debug tool - I haven't read its code
> lately). I actually would expect it to also not execute them in parallel.
> People who know more about the Python SDK and direct runner may have better
> insight.
>
> Kenn
>
> On Wed, Oct 2, 2024 at 6:31 PM Jaehyeon Kim <dott...@gmail.com> wrote:
>
>> Hello,
>>
>> I have a simple pipeline that prints a number of SQL query strings. A
>> total of 9 query strings are generated by a DoFn called *GenerateQueryFn* and
>> those are pushed into a splittable DoFn named *ProcessQueryFn*. The
>> OffsetRange is used as a restriction, and the start and stop values are set
>> to 0 and 1 respectively because it just needs to print a query string as a
>> whole.
>>
>> import argparse
>> import logging
>> import typing
>> import time
>>
>> import apache_beam as beam
>> from apache_beam.options.pipeline_options import PipelineOptions
>> from apache_beam.options.pipeline_options import SetupOptions
>> from apache_beam import RestrictionProvider
>> from apache_beam.io.restriction_trackers import OffsetRange,
>> OffsetRestrictionTracker
>>
>>
>> class GenerateQueryFn(beam.DoFn):
>>     def process(self, element: typing.Any):
>>         for query in self._generate_queries(None):
>>             yield query
>>
>>     def _generate_queries(self, query_config):
>>         if query_config is None:
>>             logging.warning("To provide query config...")
>>
>>         return [
>>             "SELECT * FROM (SELECT id, first_name, last_name, email FROM
>> staging.users) AS subquery WHERE id >= 1 AND id < 1112",
>>             "SELECT * FROM (SELECT id, first_name, last_name, email FROM
>> staging.users) AS subquery WHERE id >= 1112 AND id < 2223",
>>             "SELECT * FROM (SELECT id, first_name, last_name, email FROM
>> staging.users) AS subquery WHERE id >= 2223 AND id < 3334",
>>             "SELECT * FROM (SELECT id, first_name, last_name, email FROM
>> staging.users) AS subquery WHERE id >= 3334 AND id < 4445",
>>             "SELECT * FROM (SELECT id, first_name, last_name, email FROM
>> staging.users) AS subquery WHERE id >= 4445 AND id < 5556",
>>             "SELECT * FROM (SELECT id, first_name, last_name, email FROM
>> staging.users) AS subquery WHERE id >= 5556 AND id < 6667",
>>             "SELECT * FROM (SELECT id, first_name, last_name, email FROM
>> staging.users) AS subquery WHERE id >= 6667 AND id < 7778",
>>             "SELECT * FROM (SELECT id, first_name, last_name, email FROM
>> staging.users) AS subquery WHERE id >= 7778 AND id < 8889",
>>             "SELECT * FROM (SELECT id, first_name, last_name, email FROM
>> staging.users) AS subquery WHERE id >= 8889 AND id <= 10000",
>>         ]
>>
>>
>> class ProcessQueryFn(beam.DoFn, RestrictionProvider):
>>     def process(
>>         self,
>>         query: str,
>>         tracker: OffsetRestrictionTracker = beam.DoFn.RestrictionParam(),
>>     ):
>>         restriction = tracker.current_restriction()
>>         for current_position in range(restriction.start, restriction.stop
>> + 1):
>>             if tracker.try_claim(current_position):
>>                 print(query)
>>                 time.sleep(2)
>>                 yield query
>>             else:
>>                 return
>>
>>     def create_tracker(self, restriction: OffsetRange) ->
>> OffsetRestrictionTracker:
>>         return OffsetRestrictionTracker(restriction)
>>
>>     def initial_restriction(self, element: str) -> OffsetRange:
>>         return OffsetRange(start=0, stop=1)
>>
>>     def restriction_size(self, element: str, restriction: OffsetRange)
>> -> int:
>>         return restriction.size()
>>
>>
>> def run(argv=None, save_main_session=True):
>>     parser = argparse.ArgumentParser(description="Beam pipeline
>> arguments")
>>     known_args, pipeline_args = parser.parse_known_args(argv)
>>
>>     # # We use the save_main_session option because one or more DoFn's
>> in this
>>     # # workflow rely on global context (e.g., a module imported at
>> module level).
>>     pipeline_options = PipelineOptions(pipeline_args)
>>     pipeline_options.view_as(SetupOptions).save_main_session =
>> save_main_session
>>     print(f"known args - {known_args}")
>>     print(f"pipeline options - {pipeline_options.display_data()}")
>>
>>     with beam.Pipeline(options=pipeline_options) as p:
>>         (
>>             p
>>             | beam.Create([0])
>>             | beam.ParDo(GenerateQueryFn())
>>             | beam.ParDo(ProcessQueryFn())
>>         )
>>
>>         logging.getLogger().setLevel(logging.WARN)
>>         logging.info("Building pipeline ...")
>>
>>
>> if __name__ == "__main__":
>>     run()
>>
>> When I execute the pipeline with 3 workers on the Python direct runner,
>> it looks like the elements are processed in parallel, which is expected -
>> each of 3 query strings are processed at once.
>>
>> $ python pipeline.py --direct_num_workers=3
>> --direct_running_mode=multi_threading
>> known args - Namespace()
>> pipeline options - {'direct_num_workers': 3, 'direct_running_mode':
>> 'multi_threading', 'save_main_session': True}
>> WARNING:root:To provide query config...
>> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
>> AS subquery WHERE id >= 1 AND id < 1112
>> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
>> AS subquery WHERE id >= 2223 AND id < 3334
>> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
>> AS subquery WHERE id >= 1112 AND id < 2223
>> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
>> AS subquery WHERE id >= 3334 AND id < 4445
>> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
>> AS subquery WHERE id >= 5556 AND id < 6667
>> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
>> AS subquery WHERE id >= 4445 AND id < 5556
>> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
>> AS subquery WHERE id >= 6667 AND id < 7778
>> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
>> AS subquery WHERE id >= 7778 AND id < 8889
>> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
>> AS subquery WHERE id >= 8889 AND id <= 10000
>>
>> When I execute it on the Flink runner where the parallelism is set to 3,
>> the elements seem to be processed by a single worker. I use Apache Beam
>> 2.57.0 and Flink 1.18.1.
>>
>> $ python pipeline.py --job_name=sql-query --runner FlinkRunner \
>>   --flink_master=localhost:8081 --environment_type=LOOPBACK
>> --parallelism=3
>> known args - Namespace()
>> pipeline options - {'runner': 'FlinkRunner', 'job_name': 'sql-query',
>> 'save_main_session': True, 'environment_type': 'LOOPBACK', 'flink_master':
>> 'localhost:8081', 'parallelism': 3}
>> WARNING:root:Waiting for grpc channel to be ready at localhost:47651.
>> WARNING:root:Waiting for grpc channel to be ready at localhost:47651.
>> WARNING:root:Waiting for grpc channel to be ready at localhost:47651.
>> WARNING:root:Waiting for grpc channel to be ready at localhost:47651.
>> WARNING:root:To provide query config...
>> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
>> AS subquery WHERE id >= 1 AND id < 1112
>> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
>> AS subquery WHERE id >= 1112 AND id < 2223
>> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
>> AS subquery WHERE id >= 2223 AND id < 3334
>> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
>> AS subquery WHERE id >= 3334 AND id < 4445
>> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
>> AS subquery WHERE id >= 4445 AND id < 5556
>> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
>> AS subquery WHERE id >= 5556 AND id < 6667
>> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
>> AS subquery WHERE id >= 6667 AND id < 7778
>> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
>> AS subquery WHERE id >= 7778 AND id < 8889
>> SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
>> AS subquery WHERE id >= 8889 AND id <= 10000
>>
>> I can check that on the Flink UI as well. It shows only a single worker
>> receives data while the others are idle.
>>
>> [image: image.png]
>> Can you please inform me why the elements are not processed in parallel
>> on the Flink runner?
>>
>> Cheers,
>> Jaehyeon
>>
>>
>>
>>
>>
>>

Reply via email to