[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2020-01-10 Thread Hannah Jiang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 1/10/20 8:15 PM:
-

*{color:#ff}Update on 01/10/2019{color}*

We can use --direct_running_mode to switch between multi_threading and 
multi_processing.
direct_running_mode can be one of ['in_memory',  'multi_threading', 
'multi_processing']. Default mode is in_memory.

*in_memory*: it is multi threading mode, worker and runners' communication 
happens in the memory (not through gRPC).
 *multi_threading*: it is multi threading mode, worker and runners communicate 
through gRPC.
 *multi_processing*: it is multi processing, worker and runners communicate 
through gRPC.

Here is how to set the direct_running_mode.
 *Option 1*: set it with PipelineOptions().
{code:java}
 pipeline_options = PipelineOptions(direct_num_workers=2, 
direct_running_mode='multi_threading')
 p = beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(),
         options=pipeline_options)
{code}

*Option 2*: pass it with CLI.
{code:java}
 python xxx --direct_num_workers 2 - -direct_running_mode multi_threading

known_args, pipeline_args = parser.parse_known_args(argv)
 p = beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(),
         options=pipeline_options)
{code}
 

{color:#ff}*Update on 30/06/2018.*{color}

Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

_*It is supported from Beam 2.15.*_

*Run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*Run with multithreading mode:*
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))) # payload is # of threads of each worker.{code}
 

*--direct_num_workers* option is used to control parallelism. Default value is 
1. 
{code:java}
# an example to pass it from CLI.
python wordcount.py --input xx --output xx --direct_num_workers 2

# an example to set it with PipelineOptions.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])

# an example to add it to existing pipeline options.
from apache_beam.options.pipeline_options import DirectOptions
pipeline_options = xxx
pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code}


was (Author: hannahjiang):
*{color:#ff}Update on 01/10/2019{color}*

We can use --direct_running_mode to switch between multi_threading and 
multi_processing.
direct_running_mode can be one of ['in_memory',  'multi_threading', 
'multi_processing']. Default mode is in_memory.

*in_memory*: it is multi threading mode, worker and runners' communication 
happens in the memory (not through gRPC).
 *multi_threading*: it is multi threading mode, worker and runners communicate 
through gRPC.
 *multi_processing*: it is multi processing, worker and runners communicate 
through gRPC.

Here is how to set the direct_running_mode.
 *Option 1*: set it with PipelineOptions().
{code:java}
 pipeline_options = PipelineOptions(direct_num_workers=2, 
direct_running_mode='multi_threading')
 p = beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(),
         options=pipeline_options)
{code}

*Option 2*: pass it with CLI.
{code:java}
 python xxx --direct_num_workers 2 - -direct_running_mode 'multi_threading'

known_args, pipeline_args = parser.parse_known_args(argv)
 p = beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(),
         options=pipeline_options)
{code}
 

{color:#ff}*Update on 30/06/2018.*{color}

Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

_*It is supported from Beam 2.15.*_

*Run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*Run with multithreading mode:*
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      

[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2020-01-10 Thread Hannah Jiang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 1/10/20 6:24 PM:
-

*{color:#ff}Update on 01/10/2019{color}*

We can use --direct_running_mode to switch between multi_threading and 
multi_processing.
direct_running_mode can be one of ['in_memory',  'multi_threading', 
'multi_processing']. Default mode is in_memory.

*in_memory*: it is multi threading mode, worker and runners' communication 
happens in the memory (not through gRPC).
 *multi_threading*: it is multi threading mode, worker and runners communicate 
through gRPC.
 *multi_processing*: it is multi processing, worker and runners communicate 
through gRPC.

Here is how to set the direct_running_mode.
 *Option 1*: set it with PipelineOptions().
{code:java}
 pipeline_options = PipelineOptions(direct_num_workers=2, 
direct_running_mode='multi_threading')
 p = beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(),
         options=pipeline_options)
{code}

*Option 2*: pass it with CLI.
{code:java}
 python xxx --direct_num_workers 2 - -direct_running_mode 'multi_threading'

known_args, pipeline_args = parser.parse_known_args(argv)
 p = beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(),
         options=pipeline_options)
{code}
 

{color:#ff}*Update on 30/06/2018.*{color}

Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

_*It is supported from Beam 2.15.*_

*Run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*Run with multithreading mode:*
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))) # payload is # of threads of each worker.{code}
 

*--direct_num_workers* option is used to control parallelism. Default value is 
1. 
{code:java}
# an example to pass it from CLI.
python wordcount.py --input xx --output xx --direct_num_workers 2

# an example to set it with PipelineOptions.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])

# an example to add it to existing pipeline options.
from apache_beam.options.pipeline_options import DirectOptions
pipeline_options = xxx
pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code}


was (Author: hannahjiang):
*{color:#ff}Update on 01/10/2019{color}*

We added a new option (–direct_running_mode) to make it easy to switch between 
multi_threading and multi_processing mode. It is available from *v2.19.0*.

We can use --direct_running_mode to switch between multi_threading and 
multi_processing.
 The direct_running_mode can be set to one of ['in_memory',  'multi_threading', 
'multi_processing']. Default mode is in_memory.

*in_memory*: it is multi threading mode, worker and runners' communication 
happens in the memory (not through gRPC).
 *multi_threading*: it is multi threading mode, worker and runners communicate 
through gRPC.
 *multi_processing*: it is multi processing, worker and runners communicate 
through gRPC.

Here is how to set the direct_running_mode.
 *Option 1*: set it with pipeline options.
{code:java}
 pipeline_options = PipelineOptions(direct_num_workers=2, 
direct_running_mode='multi_threading')
 p = beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(),
         options=pipeline_options)
{code}
*Option 2*: pass it with CLI.
{code:java}
 python xxx --direct_num_workers 2 - -direct_running_mode 'multi_threading'
 p = beam.Pipeline(runner=fn_api_runner.FnApiRunner())
{code}
 

{color:#ff}*Update on 30/06/2018.*{color}

Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

_*It is supported from Beam 2.15.*_

*Run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*Run with multithreading mode:*
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    

[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2020-01-10 Thread Hannah Jiang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 1/10/20 6:17 PM:
-

*{color:#ff}Update on 01/10/2019{color}*

We added a new option (–direct_running_mode) to make it easy to switch between 
multi_threading and multi_processing mode. It is available from *v2.19.0*.

We can use --direct_running_mode to switch between multi_threading and 
multi_processing.
 The direct_running_mode can be set to one of ['in_memory',  'multi_threading', 
'multi_processing']. Default mode is in_memory.

*in_memory*: it is multi threading mode, worker and runners' communication 
happens in the memory (not through gRPC).
 *multi_threading*: it is multi threading mode, worker and runners communicate 
through gRPC.
 *multi_processing*: it is multi processing, worker and runners communicate 
through gRPC.

Here is how to set the direct_running_mode.
 *Option 1*: set it with pipeline options.
{code:java}
 pipeline_options = PipelineOptions(direct_num_workers=2, 
direct_running_mode='multi_threading')
 p = beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(),
         options=pipeline_options)
{code}
*Option 2*: pass it with CLI.
{code:java}
 python xxx --direct_num_workers 2 - -direct_running_mode 'multi_threading'
 p = beam.Pipeline(runner=fn_api_runner.FnApiRunner())
{code}
 

{color:#ff}*Update on 30/06/2018.*{color}

Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

_*It is supported from Beam 2.15.*_

*Run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*Run with multithreading mode:*
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))) # payload is # of threads of each worker.{code}
 

*--direct_num_workers* option is used to control parallelism. Default value is 
1. 
{code:java}
# an example to pass it from CLI.
python wordcount.py --input xx --output xx --direct_num_workers 2

# an example to set it with PipelineOptions.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])

# an example to add it to existing pipeline options.
from apache_beam.options.pipeline_options import DirectOptions
pipeline_options = xxx
pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code}


was (Author: hannahjiang):
*{color:#ff}Update on 01/10/2019{color}*

We added a new option (–direct_running_mode) to make it easy to switch between 
multi_threading and multi_processing mode. It is available from *v2.19.0*.

We can use --direct_running_mode to switch between multi_threading and 
multi_processing.
 We can set direct_running_mode to one of ['in_memory',  'multi_threading', 
'multi_processing']. Default mode is in_memory.

*in_memory*: it is multi threading mode, worker and runners' communication 
happens in the memory (not through gRPC).
 *multi_threading*: it is multi threading mode, worker and runners communicate 
through gRPC.
 *multi_processing*: it is multi processing, worker and runners communicate 
through gRPC.

Here is how to set the direct_running_mode.
 *Option 1*: set it with pipeline options.
{code:java}
 pipeline_options = PipelineOptions(direct_num_workers=2, 
direct_running_mode='multi_threading')
 p = beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(),
         options=pipeline_options)
{code}
*Option 2*: pass it with CLI.
{code:java}
 python xxx --direct_num_workers 2 - -direct_running_mode 'multi_threading'
 p = beam.Pipeline(runner=fn_api_runner.FnApiRunner())
{code}
 

{color:#FF}*Update on 30/06/2018.*{color}

Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

_*It is supported from Beam 2.15.*_

*Run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*Run with multithreading mode:*
{code:java}
# using embedded grpc runner
p = 

[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2020-01-10 Thread Hannah Jiang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 1/10/20 6:16 PM:
-

*{color:#ff}Update on 01/10/2019{color}*

We added a new option (–direct_running_mode) to make it easy to switch between 
multi_threading and multi_processing mode. It is available from *v2.19.0*.

We can use --direct_running_mode to switch between multi_threading and 
multi_processing.
 We can set direct_running_mode to one of ['in_memory',  'multi_threading', 
'multi_processing']. Default mode is in_memory.

*in_memory*: it is multi threading mode, worker and runners' communication 
happens in the memory (not through gRPC).
 *multi_threading*: it is multi threading mode, worker and runners communicate 
through gRPC.
 *multi_processing*: it is multi processing, worker and runners communicate 
through gRPC.

Here is how to set the direct_running_mode.
 *Option 1*: set it with pipeline options.
{code:java}
 pipeline_options = PipelineOptions(direct_num_workers=2, 
direct_running_mode='multi_threading')
 p = beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(),
         options=pipeline_options)
{code}
*Option 2*: pass it with CLI.
{code:java}
 python xxx --direct_num_workers 2 - -direct_running_mode 'multi_threading'
 p = beam.Pipeline(runner=fn_api_runner.FnApiRunner())
{code}
 

{color:#FF}*Update on 30/06/2018.*{color}

Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

_*It is supported from Beam 2.15.*_

*Run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*Run with multithreading mode:*
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))) # payload is # of threads of each worker.{code}
 

*--direct_num_workers* option is used to control parallelism. Default value is 
1. 
{code:java}
# an example to pass it from CLI.
python wordcount.py --input xx --output xx --direct_num_workers 2

# an example to set it with PipelineOptions.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])

# an example to add it to existing pipeline options.
from apache_beam.options.pipeline_options import DirectOptions
pipeline_options = xxx
pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code}


was (Author: hannahjiang):
*{color:#FF}Update on 01/10/2019{color}*

We added a new option (–direct_running_mode) to make it easy to switch between 
multi_threading and multi_processing mode. It is available from *v2.19.0*.

We can use --direct_running_mode to switch between multi_threading and 
multi_processing.
 We can set direct_running_mode to one of ['in_memory',  'multi_threading', 
'multi_processing']. Default mode is in_memory.

*in_memory*: it is multi threading mode, worker and runners' communication 
happens in the memory (not through gRPC).
 *multi_threading*: it is multi threading mode, worker and runners communicate 
through gRPC.
 *multi_processing*: it is multi processing, worker and runners communicate 
through gRPC.

Here is how to set the direct_running_mode.
 *Option 1*: set it with pipeline options.
{code:java}
 pipeline_options = PipelineOptions(direct_num_workers=2, 
direct_running_mode='multi_threading')
 p = beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(),
         options=pipeline_options)
{code}

*Option 2*: pass it with CLI.
{code:java}
 python xxx --direct_num_workers 2 - -direct_running_mode 'multi_threading'
 p = beam.Pipeline(runner=fn_api_runner.FnApiRunner())
{code}

 

Update on 30/06/2018.

Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

_*It is supported from Beam 2.15.*_

*Run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*Run with multithreading mode:*
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  

[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2020-01-10 Thread Hannah Jiang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 1/10/20 6:16 PM:
-

*{color:#FF}Update on 01/10/2019{color}*

We added a new option (–direct_running_mode) to make it easy to switch between 
multi_threading and multi_processing mode. It is available from *v2.19.0*.

We can use --direct_running_mode to switch between multi_threading and 
multi_processing.
 We can set direct_running_mode to one of ['in_memory',  'multi_threading', 
'multi_processing']. Default mode is in_memory.

*in_memory*: it is multi threading mode, worker and runners' communication 
happens in the memory (not through gRPC).
 *multi_threading*: it is multi threading mode, worker and runners communicate 
through gRPC.
 *multi_processing*: it is multi processing, worker and runners communicate 
through gRPC.

Here is how to set the direct_running_mode.
 *Option 1*: set it with pipeline options.
{code:java}
 pipeline_options = PipelineOptions(direct_num_workers=2, 
direct_running_mode='multi_threading')
 p = beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(),
         options=pipeline_options)
{code}

*Option 2*: pass it with CLI.
{code:java}
 python xxx --direct_num_workers 2 - -direct_running_mode 'multi_threading'
 p = beam.Pipeline(runner=fn_api_runner.FnApiRunner())
{code}

 

Update on 30/06/2018.

Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

_*It is supported from Beam 2.15.*_

*Run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*Run with multithreading mode:*
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))) # payload is # of threads of each worker.{code}
 

*--direct_num_workers* option is used to control parallelism. Default value is 
1. 
{code:java}
# an example to pass it from CLI.
python wordcount.py --input xx --output xx --direct_num_workers 2

# an example to set it with PipelineOptions.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])

# an example to add it to existing pipeline options.
from apache_beam.options.pipeline_options import DirectOptions
pipeline_options = xxx
pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code}


was (Author: hannahjiang):
Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

_*It is supported from Beam 2.15.*_

*Run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*Run with multithreading mode:*
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))) # payload is # of threads of each worker.{code}
 

*--direct_num_workers* option is used to control parallelism. Default value is 
1. 
{code:java}
# an example to pass it from CLI.
python wordcount.py --input xx --output xx --direct_num_workers 2

# an example to set it with PipelineOptions.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])

# an example to add it to existing pipeline options.
from apache_beam.options.pipeline_options import DirectOptions
pipeline_options = xxx
pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code}

> Support multi-process execution on the FnApiRunner
> --
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Charles Chen
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 

[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2019-09-06 Thread Yifan Mai (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16924585#comment-16924585
 ] 

Yifan Mai edited comment on BEAM-3645 at 9/6/19 8:54 PM:
-

While testing this I noticed that the multi-process runner does not handle 
SIGINT gracefully. I added the repro steps to BEAM-8149.


was (Author: myffi...@gmail.com):
While testing this I noticed that the multi-process runner does not handle 
SIGINT gracefully. To reproduce, run wordcount.py using the "Run with 
multiprocessing mode" instructions from the comment above (in Python 3).

Expected: wordcount terminates gracefully when Ctrl-C is pressed during 
pipeline execution (similarly to default direct runner)
Actual: wordcount hangs forever after printing the following once per worker:

{code}
Exception in thread run_worker:
Traceback (most recent call last):
  File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
  File "/usr/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
  File 
"/usr/local/google/home/yifanmai/venv/wordcount/lib/python3.6/site-packages/apache_beam/runners/portability/local_job_service.py",
 line 216, in run
'Worker subprocess exited with return code %s' % p.returncode)
RuntimeError: Worker subprocess exited with return code 1
{code}

> Support multi-process execution on the FnApiRunner
> --
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Charles Chen
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.15.0
>
>  Time Spent: 35h 20m
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2019-09-03 Thread Hannah Jiang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 9/3/19 10:36 PM:
-

Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

_*It is supported from Beam 2.15.*_

*Run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*Run with multithreading mode:*
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))) # payload is # of threads of each worker.{code}
 

*--direct_num_workers* option is used to control parallelism. Default value is 
1. 
{code:java}
# an example to pass it from CLI.
python wordcount.py --input xx --output xx --direct_num_workers 2

# an example to set it with PipelineOptions.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])

# an example to add it to existing pipeline options.
from apache_beam.options.pipeline_options import DirectOptions
pipeline_options = xxx
pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code}


was (Author: hannahjiang):
Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

_*It is supported from Beam 2.15.*_

*Run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*Run with multithreading mode:*
{code:java}
# using in memory embedded runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))) # payload is # of threads of each worker.{code}
 

*--direct_num_workers* option is used to control parallelism. Default value is 
1. 
{code:java}
# an example to pass it from CLI.
python wordcount.py --input xx --output xx --direct_num_workers 2

# an example to set it with PipelineOptions.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])

# an example to add it to existing pipeline options.
from apache_beam.options.pipeline_options import DirectOptions
pipeline_options = xxx
pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code}

> Support multi-process execution on the FnApiRunner
> --
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Charles Chen
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.15.0
>
>  Time Spent: 35h 20m
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2019-08-13 Thread Hannah Jiang (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 8/13/19 4:43 PM:
-

Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

_*It is supported from Beam 2.15.*_

*Run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*Run with multithreading mode:*
{code:java}
# using in memory embedded runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))) # payload is # of threads of each worker.{code}
 

*--direct_num_workers* option is used to control parallelism. Default value is 
1. 
{code:java}
# an example to pass it from CLI.
python wordcount.py --input xx --output xx --direct_num_workers 2

# an example to set it with PipelineOptions.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])

# an example to add it to existing pipeline options.
from apache_beam.options.pipeline_options import DirectOptions
pipeline_options = xxx
pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code}


was (Author: hannahjiang):
Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

_*It is supported from Beam 2.15.*_

*Run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*Run with multithreading mode:*
{code:java}
# using in memory embedded runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))) # payload is # of threads of each worker.{code}
 

*--direct_num_workers* option is used to control parallelism. Default value is 
1. 
{code:java}
# an example to pass it from CLI.
python wordcount.py --input xx --output xx --direct_num_workers 2

# an example to set it with PipelineOptions.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])

# an example add it to existing pipeline options.
from apache_beam.options.pipeline_options import DirectOptions
pipeline_options = xxx
pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code}

> Support multi-process execution on the FnApiRunner
> --
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Charles Chen
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.15.0
>
>  Time Spent: 35h 20m
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2019-08-09 Thread Hannah Jiang (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 8/9/19 6:54 PM:


Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

_*It is supported from Beam 2.15.*_

*Run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*Run with multithreading mode:*
{code:java}
# using in memory embedded runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))) # payload is # of threads of each worker.{code}
 

*--direct_num_workers* option is used to control parallelism. Default value is 
1. 
{code:java}
# an example to pass it from CLI.
python wordcount.py --input xx --output xx --direct_num_workers 2

# an example to set it with PipelineOptions.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])

# an example add it to existing pipeline options.
from apache_beam.options.pipeline_options import DirectOptions
pipeline_options = xxx
pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code}


was (Author: hannahjiang):
Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

*Run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*Run with multithreading mode:*
{code:java}
# using in memory embedded runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))) # payload is # of threads of each worker.{code}
 

*--direct_num_workers* option is used to control parallelism. Default value is 
1. 
{code:java}
# an example to pass it from CLI.
python wordcount.py --input xx --output xx --direct_num_workers 2

# an example to set it with PipelineOptions.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])

# an example add it to existing pipeline options.
from apache_beam.options.pipeline_options import DirectOptions
pipeline_options = xxx
pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code}

> Support multi-process execution on the FnApiRunner
> --
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Charles Chen
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.15.0
>
>  Time Spent: 35h 20m
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2019-08-09 Thread Hannah Jiang (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 8/9/19 6:53 PM:


Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

*Run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*Run with multithreading mode:*
{code:java}
# using in memory embedded runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))) # payload is # of threads of each worker.{code}
 

*--direct_num_workers* option is used to control parallelism. Default value is 
1. 
{code:java}
# an example to pass it from CLI.
python wordcount.py --input xx --output xx --direct_num_workers 2

# an example to set it with PipelineOptions.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])

# an example add it to existing pipeline options.
from apache_beam.options.pipeline_options import DirectOptions
pipeline_options = xxx
pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code}


was (Author: hannahjiang):
Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

*Run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*Run with multithreading mode:*
{code:java}
# using in memory embedded runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))) # payload is # of threads of each worker.{code}
 

*--direct_num_workers* option is used to control number of workers. Default 
value is 1. 
{code:java}
# an example to pass it from CLI.
python wordcount.py --input xx --output xx --direct_num_workers 2

# an example to set it with PipelineOptions.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])

# an example add it to existing pipeline options.
from apache_beam.options.pipeline_options import DirectOptions
pipeline_options = xxx
pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code}

> Support multi-process execution on the FnApiRunner
> --
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Charles Chen
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.15.0
>
>  Time Spent: 35h 20m
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2019-08-08 Thread Hannah Jiang (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 8/8/19 4:48 PM:


Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

*Run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*Run with multithreading mode:*
{code:java}
# using in memory embedded runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))) # payload is # of threads of each worker.{code}
 

*--direct_num_workers* option is used to control number of workers. Default 
value is 1. 
{code:java}
# an example to pass it from CLI.
python wordcount.py --input xx --output xx --direct_num_workers 2

# an example to set it with PipelineOptions.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])

# an example add it to existing pipeline options.
from apache_beam.options.pipeline_options import DirectOptions
pipeline_options = xxx
pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code}


was (Author: hannahjiang):
Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

*How to run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*How to run with multithreading mode:*
{code:java}
# using in memory embedded runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))) # payload is # of threads of each worker.{code}
 

*--direct_num_workers* option is used to control number of workers. Default 
value is 1. 
{code:java}
# an example to pass it from CLI.
python wordcount.py --input xx --output xx --direct_num_workers 2

# an example to set it with PipelineOptions.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])

# an example add it to existing pipeline options.
from apache_beam.options.pipeline_options import DirectOptions
pipeline_options = xxx
pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code}

> Support multi-process execution on the FnApiRunner
> --
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Charles Chen
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.15.0
>
>  Time Spent: 35h 20m
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2019-08-02 Thread Hannah Jiang (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 8/2/19 7:03 PM:


Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

*How to run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*How to run with multithreading mode:*
{code:java}
# using in memory embedded runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))) # payload is # of threads of each worker.{code}
 

*--direct_num_workers* option is used to control number of workers. Default 
value is 1. 
{code:java}
# an example to pass it from CLI.
python wordcount.py --input xx --output xx --direct_num_workers 2

# an example to set it with PipelineOptions.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])

# an example add it to existing pipeline options.
from apache_beam.options.pipeline_options import DirectOptions
pipeline_options = xxx
pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code}


was (Author: hannahjiang):
Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

*How to run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*How to run with multithreading mode:*
{code:java}
# using in memory embedded runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))) # payload is # of threads of each worker.{code}
 

*--direct_num_workers* option is used to control number of workers. Default 
value is 1. 
{code:java}
# an example to pass it from CLI.
python wordcount.py --input xx --output xx --direct_num_workers 2

# an example to set it with PipelineOptions.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])

# an example add it to existing pipeline options.
pipeline_options = xxx
pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code}

> Support multi-process execution on the FnApiRunner
> --
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Charles Chen
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.15.0
>
>  Time Spent: 35h 20m
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2019-08-02 Thread Hannah Jiang (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 8/2/19 7:02 PM:


Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

*How to run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*How to run with multithreading mode:*
{code:java}
# using in memory embedded runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))) # payload is # of threads of each worker.{code}
 

*--direct_num_workers* option is used to control number of workers. Default 
value is 1. 
{code:java}
# an example to pass it from CLI.
python wordcount.py --input xx --output xx --direct_num_workers 2

# an example to set it with PipelineOptions.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])

# an example add it to existing pipeline options.
pipeline_options = xxx
pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code}


was (Author: hannahjiang):
Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

*How to run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*How to run with multithreading mode:*
{code:java}
# using in memory embedded runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))) # payload is # of threads of each worker.{code}
 

*--direct_num_workers* option is used to control number of workers. Default 
value is 1. 
{code:java}
# an example to pass it from CLI.
python wordcount.py --input xx --output xx --direct_num_workers 2

# an example to set it with pipeline options directly.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])

# an example add it to existing pipeline options.
pipeline_options = xxx
pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code}

> Support multi-process execution on the FnApiRunner
> --
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Charles Chen
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.15.0
>
>  Time Spent: 35h 20m
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2019-08-02 Thread Hannah Jiang (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 8/2/19 6:24 PM:


Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

*How to run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*How to run with multithreading mode:*
{code:java}
# using in memory embedded runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))) # payload is # of threads of each worker.{code}
 

*--direct_num_workers* option is used to control number of workers. Default 
value is 1. 
{code:java}
# an example to pass it from CLI.
python wordcount.py --input xx --output xx --direct_num_workers 2

# an example to set it with pipeline options directly.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])

# an example add it to existing pipeline options.
pipeline_options = xxx
pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code}


was (Author: hannahjiang):
Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

*How to run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*How to run with multithreading mode:*
{code:java}
# using in memory embedded runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))) # payload is # of threads of each worker.{code}
 

*--direct_num_workers* option is used to control number of workers. Default 
value is 1. 
{code:java}
# an example to pass from CLI.
python wordcount.py --input xx --output xx --direct_num_workers 2

# an example to set it with pipeline options directly.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])

# an example add it to existing pipeline options.
pipeline_options = xxx
pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code}

> Support multi-process execution on the FnApiRunner
> --
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Charles Chen
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.15.0
>
>  Time Spent: 35h 20m
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2019-08-02 Thread Hannah Jiang (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 8/2/19 6:24 PM:


Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

*How to run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*How to run with multithreading mode:*
{code:java}
# using in memory embedded runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))) # payload is # of threads of each worker.{code}
 

*--direct_num_workers* option is used to control number of workers. Default 
value is 1. 
{code:java}
# an example to pass from CLI.
python wordcount.py --input xx --output xx --direct_num_workers 2

# an example to set it with pipeline options directly.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])

# an example add it to existing pipeline options.
pipeline_options = xxx
pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code}


was (Author: hannahjiang):
Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

*How to run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*How to run with multithreading mode:*
{code:java}
# using in memory embedded runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))) # payload is # of threads of each worker.{code}
 

*--direct_num_workers* option is used to control number of workers. Default 
value is 1. 
{code:java}
# an example to pass --direct_num_workers to a job.
python wordcount.py --input xx --output xx --direct_num_workers 2

# an example to set it with pipeline options directly.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])

# an example add it to existing pipeline options.
pipeline_options = xxx
pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code}

> Support multi-process execution on the FnApiRunner
> --
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Charles Chen
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.15.0
>
>  Time Spent: 35h 20m
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2019-08-02 Thread Hannah Jiang (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 8/2/19 6:22 PM:


Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

*How to run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*How to run with multithreading mode:*
{code:java}
# using in memory embedded runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))) # payload is # of threads of each worker.{code}
 

*--direct_num_workers* option is used to control number of workers. Default 
value is 1. 
{code:java}
# an example to pass --direct_num_workers to a job.
python wordcount.py --input xx --output xx --direct_num_workers 2

# an example to set it with pipeline options directly.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])

# an example add it to existing pipeline options.
pipeline_options = xxx
pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code}


was (Author: hannahjiang):
Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

*How to run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*How to run with multithreading mode:*
{code:java}
# using in memory embedded runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))) # payload is # of threads of each worker.{code}
 

*--direct_num_workers* option is used to control number of workers. Default 
value is 1. 
{code:java}
# an example to pass --direct_num_workers to a job.
python wordcount.py --input xx --output xx --direct_num_workers 2

# an example to set it with pipeline options directly.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])

# an example add to existing pipeline options.
pipeline_options = xxx
pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code}

> Support multi-process execution on the FnApiRunner
> --
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Charles Chen
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.15.0
>
>  Time Spent: 35h 20m
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2019-08-02 Thread Hannah Jiang (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 8/2/19 6:22 PM:


Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

*How to run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*How to run with multithreading mode:*
{code:java}
# using in memory embedded runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))) # payload is # of threads of each worker.{code}
 

*--direct_num_workers* option is used to control number of workers. Default 
value is 1. 
{code:java}
# an example to pass --direct_num_workers to a job.
python wordcount.py --input xx --output xx --direct_num_workers 2

# an example to set it with pipeline options directly.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])

# an example add to existing pipeline options.
pipeline_options = xxx
pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code}


was (Author: hannahjiang):
Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

*How to run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*How to run with multithreading mode:*
{code:java}
# using in memory embedded runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))) # payload is # of threads of each worker.{code}
 

*--direct_num_workers* option is used to control number of workers. Default 
value is 1. 
{code:java}
# an example to pass --direct_num_workers to a job.
python wordcount.py --input xx --output xx --direct_num_workers 2
{code}

> Support multi-process execution on the FnApiRunner
> --
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Charles Chen
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.15.0
>
>  Time Spent: 35h 20m
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2019-07-31 Thread Hannah Jiang (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 8/1/19 2:24 AM:


Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

*How to run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*How to run with multithreading mode:*
{code:java}
# using in memory embedded runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))){code}
 

*--direct_num_workers* option is used to control number of workers. Default 
value is 1. 
{code:java}
# an example to pass --direct_num_workers to a job.
python wordcount.py --input xx --output xx --direct_num_workers 2
{code}


was (Author: hannahjiang):
Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

*How to run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*How to run with multithreading mode:*
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))){code}
 

*--direct_num_workers* option is used to control number of workers. Default 
value is 1. 
{code:java}
# an example to pass --direct_num_workers to a job.
python wordcount.py --input xx --output xx --direct_num_workers 2
{code}

> Support multi-process execution on the FnApiRunner
> --
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Charles Chen
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.15.0
>
>  Time Spent: 35h 20m
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2019-07-31 Thread Hannah Jiang (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 8/1/19 2:24 AM:


Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

*How to run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*How to run with multithreading mode:*
{code:java}
# using in memory embedded runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))) # payload is # of threads of each worker.{code}
 

*--direct_num_workers* option is used to control number of workers. Default 
value is 1. 
{code:java}
# an example to pass --direct_num_workers to a job.
python wordcount.py --input xx --output xx --direct_num_workers 2
{code}


was (Author: hannahjiang):
Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

*How to run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*How to run with multithreading mode:*
{code:java}
# using in memory embedded runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))){code}
 

*--direct_num_workers* option is used to control number of workers. Default 
value is 1. 
{code:java}
# an example to pass --direct_num_workers to a job.
python wordcount.py --input xx --output xx --direct_num_workers 2
{code}

> Support multi-process execution on the FnApiRunner
> --
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Charles Chen
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.15.0
>
>  Time Spent: 35h 20m
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2019-07-31 Thread Hannah Jiang (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 7/31/19 8:02 PM:
-

Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

*How to run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*How to run with multithreading mode:*
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))){code}
 

*--direct_num_workers* option is used to control number of workers. Default 
value is 1. 
{code:java}
# an example to pass --direct_num_workers to a job.
python wordcount.py --input xx --output xx --direct_num_workers 2
{code}


was (Author: hannahjiang):
Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running with multithreading or 
multiprocessing mode.

*How to run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*How to run with multithreading mode:*
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))){code}
 

*--direct_num_workers* option is used to control number of partitions for each 
map task. Default value is 1. 
{code:java}
# an example to pass --direct_num_workers to a job.
python wordcount.py --input xx --output xx --direct_num_workers 2
{code}

> Support multi-process execution on the FnApiRunner
> --
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Charles Chen
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.15.0
>
>  Time Spent: 35h 20m
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2019-07-31 Thread Hannah Jiang (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 7/31/19 5:54 PM:
-

Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running with multithreading or 
multiprocessing mode.

*How to run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*How to run with multithreading mode:*
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))){code}
 

*--direct_num_workers* option is used to control number of partitions for each 
map task. Default value is 1. 
{code:java}
# an example to pass --direct_num_workers to a job.
python wordcount.py --input xx --output xx --direct_num_workers 2
{code}


was (Author: hannahjiang):
Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running with multithreading or 
multiprocessing mode.

*How to run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*How to run with multithreading mode:*

There are multiple options.
{code:java}
# 1. using in-memory runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# 2. using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))){code}
 

*--direct_num_workers* option is used to control number of partitions for each 
map task. Default value is 1. 
{code:java}
# an example to pass --direct_num_workers to a job.
python wordcount.py --input xx --output xx --direct_num_workers 2
{code}

> Support multi-process execution on the FnApiRunner
> --
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Charles Chen
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.15.0
>
>  Time Spent: 35h 20m
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2019-07-31 Thread Hannah Jiang (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 7/31/19 5:29 PM:
-

Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running with multithreading or 
multiprocessing mode.

*How to run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*How to run with multithreading mode:*

There are multiple options.
{code:java}
# 1. using in-memory runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# 2. using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))){code}
 

*--direct_num_workers* option is used to control number of partitions for each 
map task. Default value is 1. 
{code:java}
# an example to pass --direct_num_workers to a job.
python wordcount.py --input xx --output xx --direct_num_workers 2
{code}


was (Author: hannahjiang):
Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running with multithreading or 
multiprocessing mode.

*How to run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*How to run with multithreading mode:*

There are multiple runner options.
{code:java}
# 1. using in-memory runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# 2. using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))){code}
 

*--direct_num_workers* option is used to control number of partitions for each 
map task. Default value is 1. 
{code:java}
# an example to pass --direct_num_workers to a job.
python wordcount.py --input xx --output xx --direct_num_workers 2
{code}

> Support multi-process execution on the FnApiRunner
> --
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Charles Chen
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.15.0
>
>  Time Spent: 35h 20m
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2019-07-31 Thread Hannah Jiang (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 7/31/19 5:28 PM:
-

Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running with multithreading or 
multiprocessing mode.

*How to run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*How to run with multithreading mode:*

There are multiple runner options.
{code:java}
# 1. using in-memory runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# 2. using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))){code}
 

*--direct_num_workers* option is used to control number of partitions for each 
map task. Default value is 1. 
{code:java}
# an example to pass --direct_num_workers to a job.
python wordcount.py --input xx --output xx --direct_num_workers 2
{code}


was (Author: hannahjiang):
Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running with multithreading or 
multiprocessing mode.

*How to run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*How to run with multithreading mode:*

There are multiple runner options.
{code:java}
# 1. using in-memory runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# 2. using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))){code}
*--direct_num_workers* option is used to control number of partitions for each 
map task. Default value is 1. 
{code:java}
# an example to pass --direct_num_workers to a job.
python wordcount.py --input xx --output xx --direct_num_workers 2
{code}

> Support multi-process execution on the FnApiRunner
> --
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Charles Chen
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.15.0
>
>  Time Spent: 35h 20m
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2019-07-31 Thread Hannah Jiang (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 7/31/19 5:13 PM:
-

Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running with multithreading or 
multiprocessing mode.

*How to run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*How to run with multithreading mode:*

There are multiple runner options.
{code:java}
# 1. using in-memory runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# 2. using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))){code}
*--direct_num_workers* option is used to control number of partitions for each 
map task. Default value is 1. 
{code:java}
# an example to pass --direct_num_workers to a job.
python wordcount.py --input xx --output xx --direct_num_workers 2
{code}


was (Author: hannahjiang):
Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running with multithreading or 
multiprocessing mode.

*How to run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*How to run with multithreading mode:*

There are multiple runner options.
{code:java}
# 1. using in-memory runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# 2. using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))){code}
*--direct_num_workers* option is used to define number of threads/processes for 
each map task. Default value is 1. 
{code:java}
# an example to pass --direct_num_workers to a job.
python wordcount.py --input xx --output xx --direct_num_workers 2
{code}

> Support multi-process execution on the FnApiRunner
> --
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Charles Chen
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.15.0
>
>  Time Spent: 35h 20m
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2019-07-31 Thread Hannah Jiang (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 7/31/19 5:09 PM:
-

Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running with multithreading or 
multiprocessing mode.

*How to run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*How to run with multithreading mode:*

There ** are multiple runner options.

 
 # # using in-memory runner
p = beam.Pipeline(options=pipeline_options)
 # # using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1')))

 

 

*--direct_num_workers* option is used to define number of threads/processes for 
each map task. Default value is 1. 
{code:java}
# an example to pass --direct_num_workers to a job.
python wordcount.py --input xx --output xx --direct_num_workers 2
{code}


was (Author: hannahjiang):
Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running at multi-threads or 
multi-processes environment.

In order to run map tasks with multi-processes, we should use subprocess 
environment. Workers running at other environment are running at multi-thread.
{code:java}
# create a pipeline with subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*--direct_num_workers* option is used to define number of threads/processes for 
each map task. Default value is 1. 
{code:java}
# an example to pass --direct_num_workers to a job.
python wordcount.py --input xx --output xx --direct_num_workers 2
{code}

> Support multi-process execution on the FnApiRunner
> --
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Charles Chen
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.15.0
>
>  Time Spent: 35h 20m
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2019-07-31 Thread Hannah Jiang (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 7/31/19 5:11 PM:
-

Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running with multithreading or 
multiprocessing mode.

*How to run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*How to run with multithreading mode:*

There are multiple runner options.
{code:java}
# 1. using in-memory runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# 2. using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))){code}
*--direct_num_workers* option is used to define number of threads/processes for 
each map task. Default value is 1. 
{code:java}
# an example to pass --direct_num_workers to a job.
python wordcount.py --input xx --output xx --direct_num_workers 2
{code}


was (Author: hannahjiang):
Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running with multithreading or 
multiprocessing mode.

*How to run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*How to run with multithreading mode:*

There ** are multiple runner options.

 
 # # using in-memory runner
p = beam.Pipeline(options=pipeline_options)
 # # using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1')))

 

 

*--direct_num_workers* option is used to define number of threads/processes for 
each map task. Default value is 1. 
{code:java}
# an example to pass --direct_num_workers to a job.
python wordcount.py --input xx --output xx --direct_num_workers 2
{code}

> Support multi-process execution on the FnApiRunner
> --
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Charles Chen
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.15.0
>
>  Time Spent: 35h 20m
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2019-07-31 Thread Hannah Jiang (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 7/31/19 4:35 PM:
-

Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running at multi-threads or 
multi-processes environment.

In order to run map tasks with multi-processes, we should use subprocess 
environment. Workers running at other environment are running at multi-thread.
{code:java}
# create a pipeline with subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*--direct_num_workers* option is used to define number of threads/processes for 
each map task. Default value is 1. 
{code:java}
# an example to pass --direct_num_workers to a job.
python wordcount.py --input xx --output xx --direct_num_workers 2
{code}


was (Author: hannahjiang):
Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running at multi-threads or 
multi-processes environment.

In order to run map tasks with multi-processes, we should use subprocess 
environment. Workers running at other environment are running at multi-thread.
{code:java}
# create a pipeline at subprocess environment
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*--direct_num_workers* option is used to define number of threads/processes for 
each map task. Default value is 1. 
{code:java}
# an example to pass --direct_num_workers to a job.
python wordcount.py --input xx --output xx --direct_num_workers 2
{code}

> Support multi-process execution on the FnApiRunner
> --
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Charles Chen
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.15.0
>
>  Time Spent: 35h 20m
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2019-07-31 Thread Hannah Jiang (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 7/31/19 4:33 PM:
-

Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running at multi-threads or 
multi-processes environment.

In order to run map tasks with multi-processes, we should use subprocess 
environment. Workers running at other environment are running at multi-thread.
{code:java}
# create a pipeline at subprocess environment
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
  urn=python_urns.SUBPROCESS_SDK,
  payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
  sys.executable.encode('ascii'
{code}
 

*--direct_num_workers* option is used to define number of threads/processes for 
each map task. Default value is 1. 
{code:java}
# an example to pass --direct_num_workers to a job.
python wordcount.py --input xx --output xx --direct_num_workers 2
{code}


was (Author: hannahjiang):
_This summary is pending review to make sure it's 100% correct._

Previously, shuffle data, which we call buffer as well with FnApi, is processed 
by a single worker. Now, the shuffle data, except WindowGroupingBuffer, is 
partitioned to N chunks and each chunk is processed by a worker, thus 
processing performance is improved. Depending on running environment, 
partitioned shuffles can be multi-thread or multi-process processed.

 

Fn Api provides following running environment.

1. EmbeddedWorkerHandler

2. EmbeddedGrpcWorkerHandler

3. SubprocessSdkWorkerHandler

4.  DockerSdkWorkerHandler

5.  ExternalWorkerHandler

 

Workers running with SubprocessSdkWorkerHandler are running at processes, so if 
we want to multi-process shuffle data, we should use this environment. Workers 
running with other worker handlers are running at a single process.

*--direct_num_workers* option is used to define number of partitions of 
shuffle, default value is 1.

> Support multi-process execution on the FnApiRunner
> --
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Charles Chen
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.15.0
>
>  Time Spent: 35h 20m
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2019-07-30 Thread Hannah Jiang (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 7/30/19 10:26 PM:
--

_This summary is pending review to make sure it's 100% correct._

Previously, shuffle data, which we call buffer as well with FnApi, is processed 
by a single worker. Now, the shuffle data, except WindowGroupingBuffer, is 
partitioned to N chunks and each chunk is processed by a worker, thus 
processing performance is improved. Depending on running environment, 
partitioned shuffles can be multi-thread or multi-process processed.

 

Fn Api provides following running environment.

1. EmbeddedWorkerHandler

2. EmbeddedGrpcWorkerHandler

3. SubprocessSdkWorkerHandler

4.  DockerSdkWorkerHandler

5.  ExternalWorkerHandler

 

Workers running with SubprocessSdkWorkerHandler are running at processes, so if 
we want to multi-process shuffle data, we should use this environment. Workers 
running with other worker handlers are running at a single process.

*--direct_num_workers* option is used to define number of partitions of 
shuffle, default value is 1.


was (Author: hannahjiang):
_pending review to make sure this summary is 100% correct._

Previously, shuffle data, which we call buffer as well with FnApi, is processed 
by a single worker. Now, the shuffle data, except WindowGroupingBuffer, is 
partitioned to N chunks and each chunk is processed by a worker, thus 
processing performance is improved. Depending on running environment, 
partitioned shuffles can be multi-thread or multi-process processed.

 

Fn Api provides following running environment.

1. EmbeddedWorkerHandler

2. EmbeddedGrpcWorkerHandler

3. SubprocessSdkWorkerHandler

4.  DockerSdkWorkerHandler

5.  ExternalWorkerHandler

 

Workers running with SubprocessSdkWorkerHandler are running at processes, so if 
we want to multi-process shuffle data, we should use this environment. Workers 
running with other worker handlers are running at a single process.

*--direct_num_workers* option is used to define number of partitions of 
shuffle, default value is 1.

> Support multi-process execution on the FnApiRunner
> --
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Charles Chen
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.15.0
>
>  Time Spent: 35h
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2019-07-30 Thread Hannah Jiang (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 7/30/19 10:20 PM:
--

# pending review to make sure this summary is 100% correct.

Previously, shuffle data, which we call buffer as well with FnApi, is processed 
by a single worker. Now, the shuffle data, except WindowGroupingBuffer, is 
partitioned to N chunks and each chunk is processed by a worker, thus 
processing performance is improved. Depending on running environment, 
partitioned shuffles can be multi-thread or multi-process processed.

 

Fn Api provides following running environment.

1. EmbeddedWorkerHandler

2. EmbeddedGrpcWorkerHandler

3. SubprocessSdkWorkerHandler

4.  DockerSdkWorkerHandler

5.  ExternalWorkerHandler

 

Workers running with SubprocessSdkWorkerHandler are running at processes, so if 
we want to multi-process shuffle data, we should use this environment. Workers 
running with other worker handlers are running at a single process.

*--direct_num_workers* option is used to define number of partitions of 
shuffle, default value is 1.


was (Author: hannahjiang):
# pending review to make sure this summary is 100% correct.

Previously, shuffle data, which we call buffer as well with FnApi, is processed 
by a single worker. Now, the shuffle data, except WindowGroupingBuffer, is 
partitioned to N chunks and each chunk is processed by a worker, thus 
processing performance is improved. Depending on running environment, 
partitioned shuffles can be multi-thread or multi-process processed.

 

Fn Api provides following running environment.

1. EmbeddedWorkerHandler

2. EmbeddedGrpcWorkerHandler

3. SubprocessSdkWorkerHandler

4.  DockerSdkWorkerHandler

5.  ExternalWorkerHandler

 

Workers running with SubprocessSdkWorkerHandler are running at processes, so if 
we want to multi-process shuffle data, we should use this environment. Workers 
running with other worker handlers are running at a single process.

*--direct_num_workers* option is used to define number of partitions of 
shuffle, default value is 1.

> Support multi-process execution on the FnApiRunner
> --
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Charles Chen
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.15.0
>
>  Time Spent: 35h
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2019-07-30 Thread Hannah Jiang (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 7/30/19 10:20 PM:
--

_pending review to make sure this summary is 100% correct._

Previously, shuffle data, which we call buffer as well with FnApi, is processed 
by a single worker. Now, the shuffle data, except WindowGroupingBuffer, is 
partitioned to N chunks and each chunk is processed by a worker, thus 
processing performance is improved. Depending on running environment, 
partitioned shuffles can be multi-thread or multi-process processed.

 

Fn Api provides following running environment.

1. EmbeddedWorkerHandler

2. EmbeddedGrpcWorkerHandler

3. SubprocessSdkWorkerHandler

4.  DockerSdkWorkerHandler

5.  ExternalWorkerHandler

 

Workers running with SubprocessSdkWorkerHandler are running at processes, so if 
we want to multi-process shuffle data, we should use this environment. Workers 
running with other worker handlers are running at a single process.

*--direct_num_workers* option is used to define number of partitions of 
shuffle, default value is 1.


was (Author: hannahjiang):
# pending review to make sure this summary is 100% correct.

Previously, shuffle data, which we call buffer as well with FnApi, is processed 
by a single worker. Now, the shuffle data, except WindowGroupingBuffer, is 
partitioned to N chunks and each chunk is processed by a worker, thus 
processing performance is improved. Depending on running environment, 
partitioned shuffles can be multi-thread or multi-process processed.

 

Fn Api provides following running environment.

1. EmbeddedWorkerHandler

2. EmbeddedGrpcWorkerHandler

3. SubprocessSdkWorkerHandler

4.  DockerSdkWorkerHandler

5.  ExternalWorkerHandler

 

Workers running with SubprocessSdkWorkerHandler are running at processes, so if 
we want to multi-process shuffle data, we should use this environment. Workers 
running with other worker handlers are running at a single process.

*--direct_num_workers* option is used to define number of partitions of 
shuffle, default value is 1.

> Support multi-process execution on the FnApiRunner
> --
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Charles Chen
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.15.0
>
>  Time Spent: 35h
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (BEAM-3645) Support multi-process execution on the FnApiRunner

2019-07-30 Thread Hannah Jiang (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 7/30/19 10:19 PM:
--

# pending review to make sure this summary is 100% correct.

Previously, shuffle data, which we call buffer as well with FnApi, is processed 
by a single worker. Now, the shuffle data, except WindowGroupingBuffer, is 
partitioned to N chunks and each chunk is processed by a worker, thus 
processing performance is improved. Depending on running environment, 
partitioned shuffles can be multi-thread or multi-process processed.

 

Fn Api provides following running environment.

1. EmbeddedWorkerHandler

2. EmbeddedGrpcWorkerHandler

3. SubprocessSdkWorkerHandler

4.  DockerSdkWorkerHandler

5.  ExternalWorkerHandler

 

Workers running with SubprocessSdkWorkerHandler are running at processes, so if 
we want to multi-process shuffle data, we should use this environment. Workers 
running with other worker handlers are running at a single process.

*--direct_num_workers* option is used to define number of partitions of 
shuffle, default value is 1.


was (Author: hannahjiang):
Previously, shuffle data, which we call buffer as well with FnApi, is processed 
by a single worker. Now, the shuffle data, except WindowGroupingBuffer, is 
partitioned to N chunks and each chunk is processed by a worker, thus 
processing performance is improved. Depending on running environment, 
partitioned shuffles can be multi-thread or multi-process processed.

 

Fn Api provides following running environment.

1. EmbeddedWorkerHandler

2. EmbeddedGrpcWorkerHandler

3. SubprocessSdkWorkerHandler

4.  DockerSdkWorkerHandler

5.  ExternalWorkerHandler

 

Workers running with SubprocessSdkWorkerHandler are running at processes, so if 
we want to multi-process shuffle data, we should use this environment. Workers 
running with other worker handlers are running at a single process.

 
{code:java}
--direct_num_workers
{code}
 

option is used to define number of partitions of shuffle, default value is 1.

> Support multi-process execution on the FnApiRunner
> --
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Charles Chen
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.15.0
>
>  Time Spent: 35h
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)