[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor

2020-05-01 Thread Luke Cwik (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17097665#comment-17097665
 ] 

Luke Cwik commented on BEAM-8944:
-

I was able to come up with a compromise that I believe should work well on how 
many idle threads sit around, see https://github.com/apache/beam/pull/11590. 
[~mxm], if you can try out the patch locally with a larger pipeline to see if 
it addresses the issue and by how much.

> Python SDK harness performance degradation with UnboundedThreadPoolExecutor
> ---
>
> Key: BEAM-8944
> URL: https://issues.apache.org/jira/browse/BEAM-8944
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-harness
>Affects Versions: 2.18.0
>Reporter: Yichi Zhang
>Priority: Critical
> Attachments: checkpoint-duration.png, profiling.png, 
> profiling_one_thread.png, profiling_twelve_threads.png
>
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> We are seeing a performance degradation for python streaming word count load 
> tests.
>  
> After some investigation, it appears to be caused by swapping the original 
> ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is 
> that python performance is worse with more threads on cpu-bounded tasks.
>  
> A simple test for comparing the multiple thread pool executor performance:
>  
> {code:python}
> def test_performance(self):
>    def run_perf(executor):
>      total_number = 100
>      q = queue.Queue()
>     def task(number):
>        hash(number)
>        q.put(number + 200)
>        return number
>     t = time.time()
>      count = 0
>      for i in range(200):
>        q.put(i)
>     while count < total_number:
>        executor.submit(task, q.get(block=True))
>        count += 1
>      print('%s uses %s' % (executor, time.time() - t))
>    with UnboundedThreadPoolExecutor() as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=1) as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=12) as executor:
>      run_perf(executor)
> {code}
> Results:
>  0x7fab400dbe50> uses 268.160675049
>   uses 
> 79.904583931
>   uses 
> 191.179054976
>  ```
> Profiling:
> UnboundedThreadPoolExecutor:
>  !profiling.png! 
> 1 Thread ThreadPoolExecutor:
>  !profiling_one_thread.png! 
> 12 Threads ThreadPoolExecutor:
>  !profiling_twelve_threads.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor

2020-05-01 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17097302#comment-17097302
 ] 

Maximilian Michels commented on BEAM-8944:
--

[~lcwik] For the time being, do you think we could support two modes?

1) dynamic thread allocation 
2) a static number of threads

The second mode could be removed once we can ensure that it performs as 
efficient as the first one. We can default to mode (1). What do you think?

> Python SDK harness performance degradation with UnboundedThreadPoolExecutor
> ---
>
> Key: BEAM-8944
> URL: https://issues.apache.org/jira/browse/BEAM-8944
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-harness
>Affects Versions: 2.18.0
>Reporter: Yichi Zhang
>Priority: Critical
> Attachments: checkpoint-duration.png, profiling.png, 
> profiling_one_thread.png, profiling_twelve_threads.png
>
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> We are seeing a performance degradation for python streaming word count load 
> tests.
>  
> After some investigation, it appears to be caused by swapping the original 
> ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is 
> that python performance is worse with more threads on cpu-bounded tasks.
>  
> A simple test for comparing the multiple thread pool executor performance:
>  
> {code:python}
> def test_performance(self):
>    def run_perf(executor):
>      total_number = 100
>      q = queue.Queue()
>     def task(number):
>        hash(number)
>        q.put(number + 200)
>        return number
>     t = time.time()
>      count = 0
>      for i in range(200):
>        q.put(i)
>     while count < total_number:
>        executor.submit(task, q.get(block=True))
>        count += 1
>      print('%s uses %s' % (executor, time.time() - t))
>    with UnboundedThreadPoolExecutor() as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=1) as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=12) as executor:
>      run_perf(executor)
> {code}
> Results:
>  0x7fab400dbe50> uses 268.160675049
>   uses 
> 79.904583931
>   uses 
> 191.179054976
>  ```
> Profiling:
> UnboundedThreadPoolExecutor:
>  !profiling.png! 
> 1 Thread ThreadPoolExecutor:
>  !profiling_one_thread.png! 
> 12 Threads ThreadPoolExecutor:
>  !profiling_twelve_threads.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor

2020-04-03 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17074641#comment-17074641
 ] 

Maximilian Michels commented on BEAM-8944:
--

Thank you for your thoughts Luke. I agree that the best approach is to work on 
a solution based on profiling the current code.

Here is my attempt to write a benchmark which most closely resembles our setup. 
There are at most two active bundles per SDK harness (remember, we use 16 of 
them and distribute bundles round-robin), hence two active tasks. This is 
adjustable in the {{run_benchmark}} method:

{code:python}
import time
from concurrent import futures
from future.moves import queue
from utils.thread_pool_executor import UnboundedThreadPoolExecutor
import cProfile as profiler

def run_benchmark(executor,
  max_concurrent_threads=2,
  total_iterations=1,
  warmup_iterations=100):
  q = queue.Queue()
  for i in range(max_concurrent_threads):
q.put(i)

  def task(number):
q.put(task)
return number

  count = 0
  start_time = None
  profile = profiler.Profile()

  while count < total_iterations:
if count == warmup_iterations:
  start_time = time.time()
  profile.enable()
executor.submit(task, q.get(block=True))
count += 1

  profile.disable()
  print('%s uses %s' % (executor, time.time() - start_time))
  profile.print_stats(sort='time')


if __name__ == '__main__':
  with UnboundedThreadPoolExecutor() as executor:
run_benchmark(executor)
  with futures.ThreadPoolExecutor(max_workers=1) as executor:
run_benchmark(executor)
  with futures.ThreadPoolExecutor(max_workers=12) as executor:
run_benchmark(executor)

{code}

The results clearly reveal the cost of the current implementation:

{noformat}
 
uses 7.32124900818
 575749 function calls in 7.302 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
838786.7010.0006.7010.000 {method 'acquire' of 
'thread.lock' objects}
198000.0870.0006.9580.000 Queue.py:150(get)
 85950.0640.0006.6620.001 threading.py:309(wait)
296980.0620.0000.1570.000 threading.py:373(notify)
 99030.0450.0000.0860.000 threading.py:260(__init__)
 99000.0430.0000.4810.000 
thread_pool_executor.py:134(submit)
 98990.0400.0000.1760.000 
thread_pool_executor.py:103(accepted_work)
382930.0280.0000.0990.000 threading.py:300(_is_owned)
 98990.0250.0000.1360.000 threading.py:576(set)
 99000.0200.0000.0200.000 {method '__enter__' of 
'thread.lock' objects}
 99000.0200.0000.1170.000 _base.py:318(__init__)
 99000.0140.0000.0240.000 threading.py:132(__init__)
382940.0130.0000.0130.000 threading.py:64(_note)
283940.0130.0000.0180.000 Queue.py:200(_qsize)
198060.0120.0000.0120.000 threading.py:59(__init__)
197990.0120.0000.0140.000 Queue.py:208(_get)
 98990.0110.0000.0760.000 threading.py:400(notifyAll)
 99030.0110.0000.0980.000 threading.py:242(Condition)
 85950.0100.0000.0530.000 threading.py:297(_acquire_restore)
184990.0090.0000.0090.000 {thread.allocate_lock}
 99000.0090.0000.0330.000 threading.py:114(RLock)
382940.0090.0000.0090.000 {method 'release' of 
'thread.lock' objects}
 99000.0070.0000.0070.000 
thread_pool_executor.py:34(__init__)
 99000.0070.0000.0260.000 threading.py:285(__enter__)
382930.0070.0000.0070.000 {len}
 99000.0070.0000.0080.000 threading.py:288(__exit__)
 98990.0040.0000.0040.000 {method 'remove' of 'list' 
objects}
 85950.0040.0000.0060.000 threading.py:294(_release_save)
 85950.0030.0000.0030.000 {method 'append' of 'list' 
objects}
197990.0030.0000.0030.000 {method 'popleft' of 
'collections.deque' objects}
 99000.0020.0000.0020.000 {method '__exit__' of 
'thread.lock' objects}
10.0000.0000.0000.000 {thread.start_new_thread}
10.0000.0000.0000.000 threading.py:647(__init__)
10.0000.0000.0000.000 threading.py:717(start)
20.0000.0000.0000.000 threading.py:561(__init__)
10.0000.0000.0000.000 threading.py:620(_newname)
10.0000.0000.0000.000 threading.py:597(wait)
10.0000.0000.0000.000 _weakrefset.py:83(add)
10.0000.0000.0000.000 
thread_pool_executor.py:58(__init__)
1

[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor

2020-04-02 Thread Luke Cwik (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17073871#comment-17073871
 ] 

Luke Cwik commented on BEAM-8944:
-

I was having difficulty in producing a solution that both created threads when 
needed and also was able to clean up threads without emptying the pool 
completely. Some ideas I had were:
* Have a base pool of threads that never "exit" unless shutdown happens
* Having a dedicated thread that checked queue size every N secs and generate 
new threads for what was outstanding (lag in creation, maybe non issue if the 
base pool size is like 8-16 threads)
* Whenever a worker gets an item from the queue, it checks queue size and 
spawns a new worker thread if there are any items there (will get stuck if 
**work** never completes)
* Only create threads on new submission with a certain amount of lag between.

All of these are worse then the idle worker queue. Would be good to know what 
was expensive in that solution so that we could address that directly since the 
cost might just be coming from the "timed" wait aspect.

> Python SDK harness performance degradation with UnboundedThreadPoolExecutor
> ---
>
> Key: BEAM-8944
> URL: https://issues.apache.org/jira/browse/BEAM-8944
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-harness
>Affects Versions: 2.18.0
>Reporter: Yichi Zhang
>Priority: Critical
> Attachments: checkpoint-duration.png, profiling.png, 
> profiling_one_thread.png, profiling_twelve_threads.png
>
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> We are seeing a performance degradation for python streaming word count load 
> tests.
>  
> After some investigation, it appears to be caused by swapping the original 
> ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is 
> that python performance is worse with more threads on cpu-bounded tasks.
>  
> A simple test for comparing the multiple thread pool executor performance:
>  
> {code:python}
> def test_performance(self):
>    def run_perf(executor):
>      total_number = 100
>      q = queue.Queue()
>     def task(number):
>        hash(number)
>        q.put(number + 200)
>        return number
>     t = time.time()
>      count = 0
>      for i in range(200):
>        q.put(i)
>     while count < total_number:
>        executor.submit(task, q.get(block=True))
>        count += 1
>      print('%s uses %s' % (executor, time.time() - t))
>    with UnboundedThreadPoolExecutor() as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=1) as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=12) as executor:
>      run_perf(executor)
> {code}
> Results:
>  0x7fab400dbe50> uses 268.160675049
>   uses 
> 79.904583931
>   uses 
> 191.179054976
>  ```
> Profiling:
> UnboundedThreadPoolExecutor:
>  !profiling.png! 
> 1 Thread ThreadPoolExecutor:
>  !profiling_one_thread.png! 
> 12 Threads ThreadPoolExecutor:
>  !profiling_twelve_threads.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor

2020-04-02 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17073657#comment-17073657
 ] 

Maximilian Michels commented on BEAM-8944:
--

My first though was to change the implementation to allow two worker thread 
allocation methods: (a) a static number of threads and (b) dynamically created 
worker threads. However, I don't like that the user has to worry about 
configuring such things. I very much prefer the self-tuning capabilities that 
we added with UnboundedThreadPoolExecutor. We just have to make sure there is 
no performance regression.

Thanks [~lcwik]. The implementation looks much simpler. I wonder, could we keep 
the thread timeout feature? I think it should be possible to build this without 
the use of threading.Event and threading.Condition. For example, we could add a 
check after distributing work which checks for the activity of a worker thread 
and terminates it if it hasn't been active for a while.

> Python SDK harness performance degradation with UnboundedThreadPoolExecutor
> ---
>
> Key: BEAM-8944
> URL: https://issues.apache.org/jira/browse/BEAM-8944
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-harness
>Affects Versions: 2.18.0
>Reporter: Yichi Zhang
>Priority: Critical
> Attachments: checkpoint-duration.png, profiling.png, 
> profiling_one_thread.png, profiling_twelve_threads.png
>
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> We are seeing a performance degradation for python streaming word count load 
> tests.
>  
> After some investigation, it appears to be caused by swapping the original 
> ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is 
> that python performance is worse with more threads on cpu-bounded tasks.
>  
> A simple test for comparing the multiple thread pool executor performance:
>  
> {code:python}
> def test_performance(self):
>    def run_perf(executor):
>      total_number = 100
>      q = queue.Queue()
>     def task(number):
>        hash(number)
>        q.put(number + 200)
>        return number
>     t = time.time()
>      count = 0
>      for i in range(200):
>        q.put(i)
>     while count < total_number:
>        executor.submit(task, q.get(block=True))
>        count += 1
>      print('%s uses %s' % (executor, time.time() - t))
>    with UnboundedThreadPoolExecutor() as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=1) as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=12) as executor:
>      run_perf(executor)
> {code}
> Results:
>  0x7fab400dbe50> uses 268.160675049
>   uses 
> 79.904583931
>   uses 
> 191.179054976
>  ```
> Profiling:
> UnboundedThreadPoolExecutor:
>  !profiling.png! 
> 1 Thread ThreadPoolExecutor:
>  !profiling_one_thread.png! 
> 12 Threads ThreadPoolExecutor:
>  !profiling_twelve_threads.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor

2020-04-01 Thread Luke Cwik (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17073142#comment-17073142
 ] 

Luke Cwik commented on BEAM-8944:
-

I took a look at the implementation of the UnboundedThreadPoolExecutor again to 
see if I could reduce the overhead and got to 
https://github.com/lukecwik/incubator-beam/commit/4cabd7e11c35b67901ba19a55d7f8bc6a585be3b
 but it doesn't have the same strict guarantees that a thread will exit after 
getting no work for a certain amount of time and also a new thread will be 
created when necessary.

> Python SDK harness performance degradation with UnboundedThreadPoolExecutor
> ---
>
> Key: BEAM-8944
> URL: https://issues.apache.org/jira/browse/BEAM-8944
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-harness
>Affects Versions: 2.18.0
>Reporter: Yichi Zhang
>Priority: Critical
> Attachments: checkpoint-duration.png, profiling.png, 
> profiling_one_thread.png, profiling_twelve_threads.png
>
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> We are seeing a performance degradation for python streaming word count load 
> tests.
>  
> After some investigation, it appears to be caused by swapping the original 
> ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is 
> that python performance is worse with more threads on cpu-bounded tasks.
>  
> A simple test for comparing the multiple thread pool executor performance:
>  
> {code:python}
> def test_performance(self):
>    def run_perf(executor):
>      total_number = 100
>      q = queue.Queue()
>     def task(number):
>        hash(number)
>        q.put(number + 200)
>        return number
>     t = time.time()
>      count = 0
>      for i in range(200):
>        q.put(i)
>     while count < total_number:
>        executor.submit(task, q.get(block=True))
>        count += 1
>      print('%s uses %s' % (executor, time.time() - t))
>    with UnboundedThreadPoolExecutor() as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=1) as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=12) as executor:
>      run_perf(executor)
> {code}
> Results:
>  0x7fab400dbe50> uses 268.160675049
>   uses 
> 79.904583931
>   uses 
> 191.179054976
>  ```
> Profiling:
> UnboundedThreadPoolExecutor:
>  !profiling.png! 
> 1 Thread ThreadPoolExecutor:
>  !profiling_one_thread.png! 
> 12 Threads ThreadPoolExecutor:
>  !profiling_twelve_threads.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor

2020-04-01 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17072860#comment-17072860
 ] 

Maximilian Michels commented on BEAM-8944:
--

I see that the usage pattern in the description is a bit different but since 
the regression is caused by the new thread pool logic, I posted my findings 
here.

{quote}
Is Flink is now able to schedule so much more work since the Python SDK harness 
will do an infinite number due to the UnboundedThreadPool vs the fixed number 
it supported before (so increased contention)?
{quote}

We are not primarily using the worker threads for parallelism. Instead, we are 
using 16 separate environments per Flink task manager. These 16 environments 
are served with work round-robin. In the particular pipeline there are 12 task 
slots per task manager and at most two fused stages per task slot. That makes 
for a max of 24 stages which are distributed across the 16 environments. So the 
environments will spawn more than one worker thread. I've tried increasing the 
life time of the threads but that didn't have an effect. 

{quote}
Is it possible that ThreadPoolExecutor has a C++ implementation which is making 
up for the difference?
{quote}

Possibly, I suppose we need to profile the execution.

{quote}
Do you have timing information between the two run that compares the two 
implementations?
Do you have a graph showing how many threads are alive in the Python process 
between the two runs?
{quote}

I don't have such data yet. I only have the repeatable data for checkpointing. 

Note, I was running this on both Python 2.7.6 and Python 3.6.8 which showed the 
same behavior.



> Python SDK harness performance degradation with UnboundedThreadPoolExecutor
> ---
>
> Key: BEAM-8944
> URL: https://issues.apache.org/jira/browse/BEAM-8944
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-harness
>Affects Versions: 2.18.0
>Reporter: Yichi Zhang
>Priority: Critical
> Fix For: 2.18.0
>
> Attachments: checkpoint-duration.png, profiling.png, 
> profiling_one_thread.png, profiling_twelve_threads.png
>
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> We are seeing a performance degradation for python streaming word count load 
> tests.
>  
> After some investigation, it appears to be caused by swapping the original 
> ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is 
> that python performance is worse with more threads on cpu-bounded tasks.
>  
> A simple test for comparing the multiple thread pool executor performance:
>  
> {code:python}
> def test_performance(self):
>    def run_perf(executor):
>      total_number = 100
>      q = queue.Queue()
>     def task(number):
>        hash(number)
>        q.put(number + 200)
>        return number
>     t = time.time()
>      count = 0
>      for i in range(200):
>        q.put(i)
>     while count < total_number:
>        executor.submit(task, q.get(block=True))
>        count += 1
>      print('%s uses %s' % (executor, time.time() - t))
>    with UnboundedThreadPoolExecutor() as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=1) as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=12) as executor:
>      run_perf(executor)
> {code}
> Results:
>  0x7fab400dbe50> uses 268.160675049
>   uses 
> 79.904583931
>   uses 
> 191.179054976
>  ```
> Profiling:
> UnboundedThreadPoolExecutor:
>  !profiling.png! 
> 1 Thread ThreadPoolExecutor:
>  !profiling_one_thread.png! 
> 12 Threads ThreadPoolExecutor:
>  !profiling_twelve_threads.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor

2020-04-01 Thread Luke Cwik (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17072842#comment-17072842
 ] 

Luke Cwik commented on BEAM-8944:
-

The Python 3.5 implementation of the ThreadPoolExecutor: 
https://github.com/python/cpython/blob/3.5/Lib/concurrent/futures/thread.py

> Python SDK harness performance degradation with UnboundedThreadPoolExecutor
> ---
>
> Key: BEAM-8944
> URL: https://issues.apache.org/jira/browse/BEAM-8944
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-harness
>Affects Versions: 2.18.0
>Reporter: Yichi Zhang
>Priority: Critical
> Fix For: 2.18.0
>
> Attachments: checkpoint-duration.png, profiling.png, 
> profiling_one_thread.png, profiling_twelve_threads.png
>
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> We are seeing a performance degradation for python streaming word count load 
> tests.
>  
> After some investigation, it appears to be caused by swapping the original 
> ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is 
> that python performance is worse with more threads on cpu-bounded tasks.
>  
> A simple test for comparing the multiple thread pool executor performance:
>  
> {code:python}
> def test_performance(self):
>    def run_perf(executor):
>      total_number = 100
>      q = queue.Queue()
>     def task(number):
>        hash(number)
>        q.put(number + 200)
>        return number
>     t = time.time()
>      count = 0
>      for i in range(200):
>        q.put(i)
>     while count < total_number:
>        executor.submit(task, q.get(block=True))
>        count += 1
>      print('%s uses %s' % (executor, time.time() - t))
>    with UnboundedThreadPoolExecutor() as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=1) as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=12) as executor:
>      run_perf(executor)
> {code}
> Results:
>  0x7fab400dbe50> uses 268.160675049
>   uses 
> 79.904583931
>   uses 
> 191.179054976
>  ```
> Profiling:
> UnboundedThreadPoolExecutor:
>  !profiling.png! 
> 1 Thread ThreadPoolExecutor:
>  !profiling_one_thread.png! 
> 12 Threads ThreadPoolExecutor:
>  !profiling_twelve_threads.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor

2020-04-01 Thread Luke Cwik (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17072840#comment-17072840
 ] 

Luke Cwik commented on BEAM-8944:
-

So the original analysis done in the description of the bug isn't typical usage 
because it is comparing creating `M` threads which each process one item vs 
creating a fixed number of threads which then process that then process the 
items off a queue. It is easy to see that the performance difference will be in 
the number of threads created. It is expected that in typical usage threads 
would be reused many times before being garbage collected.

Max, for your analysis in the above comment:
Is Flink is now able to schedule so much more work since the Python SDK harness 
will do an infinite number due to the UnboundedThreadPool vs the fixed number 
it supported before (so increased contention)?
Is it possible that ThreadPoolExecutor has a C++ implementation which is making 
up for the difference?
Do you have timing information between the two run that compares the two 
implementations?
Do you have a graph showing how many threads are alive in the Python process 
between the two runs?

> Python SDK harness performance degradation with UnboundedThreadPoolExecutor
> ---
>
> Key: BEAM-8944
> URL: https://issues.apache.org/jira/browse/BEAM-8944
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-harness
>Affects Versions: 2.18.0
>Reporter: Yichi Zhang
>Priority: Critical
> Fix For: 2.18.0
>
> Attachments: checkpoint-duration.png, profiling.png, 
> profiling_one_thread.png, profiling_twelve_threads.png
>
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> We are seeing a performance degradation for python streaming word count load 
> tests.
>  
> After some investigation, it appears to be caused by swapping the original 
> ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is 
> that python performance is worse with more threads on cpu-bounded tasks.
>  
> A simple test for comparing the multiple thread pool executor performance:
>  
> {code:python}
> def test_performance(self):
>    def run_perf(executor):
>      total_number = 100
>      q = queue.Queue()
>     def task(number):
>        hash(number)
>        q.put(number + 200)
>        return number
>     t = time.time()
>      count = 0
>      for i in range(200):
>        q.put(i)
>     while count < total_number:
>        executor.submit(task, q.get(block=True))
>        count += 1
>      print('%s uses %s' % (executor, time.time() - t))
>    with UnboundedThreadPoolExecutor() as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=1) as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=12) as executor:
>      run_perf(executor)
> {code}
> Results:
>  0x7fab400dbe50> uses 268.160675049
>   uses 
> 79.904583931
>   uses 
> 191.179054976
>  ```
> Profiling:
> UnboundedThreadPoolExecutor:
>  !profiling.png! 
> 1 Thread ThreadPoolExecutor:
>  !profiling_one_thread.png! 
> 12 Threads ThreadPoolExecutor:
>  !profiling_twelve_threads.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor

2020-04-01 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17072693#comment-17072693
 ] 

Maximilian Michels commented on BEAM-8944:
--

 !checkpoint-duration.png! 

yellow: 2.18.0 as released
turquoise: 2.18.0 with BEAM-8944 reverted

> Python SDK harness performance degradation with UnboundedThreadPoolExecutor
> ---
>
> Key: BEAM-8944
> URL: https://issues.apache.org/jira/browse/BEAM-8944
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-harness
>Affects Versions: 2.18.0
>Reporter: Yichi Zhang
>Priority: Critical
> Fix For: 2.18.0
>
> Attachments: checkpoint-duration.png, profiling.png, 
> profiling_one_thread.png, profiling_twelve_threads.png
>
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> We are seeing a performance degradation for python streaming word count load 
> tests.
>  
> After some investigation, it appears to be caused by swapping the original 
> ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is 
> that python performance is worse with more threads on cpu-bounded tasks.
>  
> A simple test for comparing the multiple thread pool executor performance:
>  
> {code:python}
> def test_performance(self):
>    def run_perf(executor):
>      total_number = 100
>      q = queue.Queue()
>     def task(number):
>        hash(number)
>        q.put(number + 200)
>        return number
>     t = time.time()
>      count = 0
>      for i in range(200):
>        q.put(i)
>     while count < total_number:
>        executor.submit(task, q.get(block=True))
>        count += 1
>      print('%s uses %s' % (executor, time.time() - t))
>    with UnboundedThreadPoolExecutor() as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=1) as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=12) as executor:
>      run_perf(executor)
> {code}
> Results:
>  0x7fab400dbe50> uses 268.160675049
>   uses 
> 79.904583931
>   uses 
> 191.179054976
>  ```
> Profiling:
> UnboundedThreadPoolExecutor:
>  !profiling.png! 
> 1 Thread ThreadPoolExecutor:
>  !profiling_one_thread.png! 
> 12 Threads ThreadPoolExecutor:
>  !profiling_twelve_threads.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor

2020-03-31 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17072103#comment-17072103
 ] 

Maximilian Michels commented on BEAM-8944:
--

Essentially, yes. Especially, this is a concern for latency because Flink has 
to hold back in-flight elements while performing the checkpoint alignment of 
the operators. It appears the alignment is off due to the Python bundles not 
completing in time. Not sure why that is the case. We use the load-balancing 
feature of DefaultJobBundleFactory where we use 16 Python environment and 
round-robin them.

> Python SDK harness performance degradation with UnboundedThreadPoolExecutor
> ---
>
> Key: BEAM-8944
> URL: https://issues.apache.org/jira/browse/BEAM-8944
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-harness
>Affects Versions: 2.18.0
>Reporter: Yichi Zhang
>Priority: Critical
> Fix For: 2.18.0
>
> Attachments: profiling.png, profiling_one_thread.png, 
> profiling_twelve_threads.png
>
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> We are seeing a performance degradation for python streaming word count load 
> tests.
>  
> After some investigation, it appears to be caused by swapping the original 
> ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is 
> that python performance is worse with more threads on cpu-bounded tasks.
>  
> A simple test for comparing the multiple thread pool executor performance:
>  
> {code:python}
> def test_performance(self):
>    def run_perf(executor):
>      total_number = 100
>      q = queue.Queue()
>     def task(number):
>        hash(number)
>        q.put(number + 200)
>        return number
>     t = time.time()
>      count = 0
>      for i in range(200):
>        q.put(i)
>     while count < total_number:
>        executor.submit(task, q.get(block=True))
>        count += 1
>      print('%s uses %s' % (executor, time.time() - t))
>    with UnboundedThreadPoolExecutor() as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=1) as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=12) as executor:
>      run_perf(executor)
> {code}
> Results:
>  0x7fab400dbe50> uses 268.160675049
>   uses 
> 79.904583931
>   uses 
> 191.179054976
>  ```
> Profiling:
> UnboundedThreadPoolExecutor:
>  !profiling.png! 
> 1 Thread ThreadPoolExecutor:
>  !profiling_one_thread.png! 
> 12 Threads ThreadPoolExecutor:
>  !profiling_twelve_threads.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor

2020-03-31 Thread Yichi Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17071998#comment-17071998
 ] 

Yichi Zhang commented on BEAM-8944:
---

[~mxm] the checkpoint duration is lower means that without 
UnboundedThreadPoolExecutor FlinkRunner is faster? 

> Python SDK harness performance degradation with UnboundedThreadPoolExecutor
> ---
>
> Key: BEAM-8944
> URL: https://issues.apache.org/jira/browse/BEAM-8944
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-harness
>Affects Versions: 2.18.0
>Reporter: Yichi Zhang
>Priority: Critical
> Fix For: 2.18.0
>
> Attachments: profiling.png, profiling_one_thread.png, 
> profiling_twelve_threads.png
>
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> We are seeing a performance degradation for python streaming word count load 
> tests.
>  
> After some investigation, it appears to be caused by swapping the original 
> ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is 
> that python performance is worse with more threads on cpu-bounded tasks.
>  
> A simple test for comparing the multiple thread pool executor performance:
>  
> {code:python}
> def test_performance(self):
>    def run_perf(executor):
>      total_number = 100
>      q = queue.Queue()
>     def task(number):
>        hash(number)
>        q.put(number + 200)
>        return number
>     t = time.time()
>      count = 0
>      for i in range(200):
>        q.put(i)
>     while count < total_number:
>        executor.submit(task, q.get(block=True))
>        count += 1
>      print('%s uses %s' % (executor, time.time() - t))
>    with UnboundedThreadPoolExecutor() as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=1) as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=12) as executor:
>      run_perf(executor)
> {code}
> Results:
>  0x7fab400dbe50> uses 268.160675049
>   uses 
> 79.904583931
>   uses 
> 191.179054976
>  ```
> Profiling:
> UnboundedThreadPoolExecutor:
>  !profiling.png! 
> 1 Thread ThreadPoolExecutor:
>  !profiling_one_thread.png! 
> 12 Threads ThreadPoolExecutor:
>  !profiling_twelve_threads.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor

2019-12-21 Thread Yichi Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001802#comment-17001802
 ] 

Yichi Zhang commented on BEAM-8944:
---

I think so

> Python SDK harness performance degradation with UnboundedThreadPoolExecutor
> ---
>
> Key: BEAM-8944
> URL: https://issues.apache.org/jira/browse/BEAM-8944
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-harness
>Affects Versions: 2.18.0
>Reporter: Yichi Zhang
>Assignee: Yichi Zhang
>Priority: Blocker
> Fix For: 2.18.0
>
> Attachments: profiling.png, profiling_one_thread.png, 
> profiling_twelve_threads.png
>
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> We are seeing a performance degradation for python streaming word count load 
> tests.
>  
> After some investigation, it appears to be caused by swapping the original 
> ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is 
> that python performance is worse with more threads on cpu-bounded tasks.
>  
> A simple test for comparing the multiple thread pool executor performance:
>  
> {code:python}
> def test_performance(self):
>    def run_perf(executor):
>      total_number = 100
>      q = queue.Queue()
>     def task(number):
>        hash(number)
>        q.put(number + 200)
>        return number
>     t = time.time()
>      count = 0
>      for i in range(200):
>        q.put(i)
>     while count < total_number:
>        executor.submit(task, q.get(block=True))
>        count += 1
>      print('%s uses %s' % (executor, time.time() - t))
>    with UnboundedThreadPoolExecutor() as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=1) as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=12) as executor:
>      run_perf(executor)
> {code}
> Results:
>  0x7fab400dbe50> uses 268.160675049
>   uses 
> 79.904583931
>   uses 
> 191.179054976
>  ```
> Profiling:
> UnboundedThreadPoolExecutor:
>  !profiling.png! 
> 1 Thread ThreadPoolExecutor:
>  !profiling_one_thread.png! 
> 12 Threads ThreadPoolExecutor:
>  !profiling_twelve_threads.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor

2019-12-20 Thread Ahmet Altay (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001589#comment-17001589
 ] 

Ahmet Altay commented on BEAM-8944:
---

Could this be closed after the cherry pick PR 
([https://github.com/apache/beam/pull/10430]) ?

> Python SDK harness performance degradation with UnboundedThreadPoolExecutor
> ---
>
> Key: BEAM-8944
> URL: https://issues.apache.org/jira/browse/BEAM-8944
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-harness
>Affects Versions: 2.18.0
>Reporter: Yichi Zhang
>Assignee: Yichi Zhang
>Priority: Blocker
> Fix For: 2.18.0
>
> Attachments: profiling.png, profiling_one_thread.png, 
> profiling_twelve_threads.png
>
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> We are seeing a performance degradation for python streaming word count load 
> tests.
>  
> After some investigation, it appears to be caused by swapping the original 
> ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is 
> that python performance is worse with more threads on cpu-bounded tasks.
>  
> A simple test for comparing the multiple thread pool executor performance:
>  
> {code:python}
> def test_performance(self):
>    def run_perf(executor):
>      total_number = 100
>      q = queue.Queue()
>     def task(number):
>        hash(number)
>        q.put(number + 200)
>        return number
>     t = time.time()
>      count = 0
>      for i in range(200):
>        q.put(i)
>     while count < total_number:
>        executor.submit(task, q.get(block=True))
>        count += 1
>      print('%s uses %s' % (executor, time.time() - t))
>    with UnboundedThreadPoolExecutor() as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=1) as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=12) as executor:
>      run_perf(executor)
> {code}
> Results:
>  0x7fab400dbe50> uses 268.160675049
>   uses 
> 79.904583931
>   uses 
> 191.179054976
>  ```
> Profiling:
> UnboundedThreadPoolExecutor:
>  !profiling.png! 
> 1 Thread ThreadPoolExecutor:
>  !profiling_one_thread.png! 
> 12 Threads ThreadPoolExecutor:
>  !profiling_twelve_threads.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor

2019-12-19 Thread Yichi Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17000448#comment-17000448
 ] 

Yichi Zhang commented on BEAM-8944:
---

then yeah, it'll affect python streaming jobs (which is only on portable runner 
with fnapi).

> Python SDK harness performance degradation with UnboundedThreadPoolExecutor
> ---
>
> Key: BEAM-8944
> URL: https://issues.apache.org/jira/browse/BEAM-8944
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-harness
>Affects Versions: 2.18.0
>Reporter: Yichi Zhang
>Assignee: Yichi Zhang
>Priority: Blocker
> Fix For: 2.18.0
>
> Attachments: profiling.png, profiling_one_thread.png, 
> profiling_twelve_threads.png
>
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> We are seeing a performance degradation for python streaming word count load 
> tests.
>  
> After some investigation, it appears to be caused by swapping the original 
> ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is 
> that python performance is worse with more threads on cpu-bounded tasks.
>  
> A simple test for comparing the multiple thread pool executor performance:
>  
> {code:python}
> def test_performance(self):
>    def run_perf(executor):
>      total_number = 100
>      q = queue.Queue()
>     def task(number):
>        hash(number)
>        q.put(number + 200)
>        return number
>     t = time.time()
>      count = 0
>      for i in range(200):
>        q.put(i)
>     while count < total_number:
>        executor.submit(task, q.get(block=True))
>        count += 1
>      print('%s uses %s' % (executor, time.time() - t))
>    with UnboundedThreadPoolExecutor() as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=1) as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=12) as executor:
>      run_perf(executor)
> {code}
> Results:
>  0x7fab400dbe50> uses 268.160675049
>   uses 
> 79.904583931
>   uses 
> 191.179054976
>  ```
> Profiling:
> UnboundedThreadPoolExecutor:
>  !profiling.png! 
> 1 Thread ThreadPoolExecutor:
>  !profiling_one_thread.png! 
> 12 Threads ThreadPoolExecutor:
>  !profiling_twelve_threads.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor

2019-12-19 Thread Udi Meiri (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17000445#comment-17000445
 ] 

Udi Meiri commented on BEAM-8944:
-

The point of my question is to figure whether this issue is a 2.18 blocker or 
not, and how it affects users.

> Python SDK harness performance degradation with UnboundedThreadPoolExecutor
> ---
>
> Key: BEAM-8944
> URL: https://issues.apache.org/jira/browse/BEAM-8944
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-harness
>Affects Versions: 2.18.0
>Reporter: Yichi Zhang
>Assignee: Yichi Zhang
>Priority: Blocker
> Fix For: 2.18.0
>
> Attachments: profiling.png, profiling_one_thread.png, 
> profiling_twelve_threads.png
>
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> We are seeing a performance degradation for python streaming word count load 
> tests.
>  
> After some investigation, it appears to be caused by swapping the original 
> ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is 
> that python performance is worse with more threads on cpu-bounded tasks.
>  
> A simple test for comparing the multiple thread pool executor performance:
>  
> {code:python}
> def test_performance(self):
>    def run_perf(executor):
>      total_number = 100
>      q = queue.Queue()
>     def task(number):
>        hash(number)
>        q.put(number + 200)
>        return number
>     t = time.time()
>      count = 0
>      for i in range(200):
>        q.put(i)
>     while count < total_number:
>        executor.submit(task, q.get(block=True))
>        count += 1
>      print('%s uses %s' % (executor, time.time() - t))
>    with UnboundedThreadPoolExecutor() as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=1) as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=12) as executor:
>      run_perf(executor)
> {code}
> Results:
>  0x7fab400dbe50> uses 268.160675049
>   uses 
> 79.904583931
>   uses 
> 191.179054976
>  ```
> Profiling:
> UnboundedThreadPoolExecutor:
>  !profiling.png! 
> 1 Thread ThreadPoolExecutor:
>  !profiling_one_thread.png! 
> 12 Threads ThreadPoolExecutor:
>  !profiling_twelve_threads.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor

2019-12-19 Thread Udi Meiri (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17000443#comment-17000443
 ] 

Udi Meiri commented on BEAM-8944:
-

By current I don't mean the current Beam release but the current 
production-ready runners (for example IIUC portability on Dataflow is not 
production ready).

> Python SDK harness performance degradation with UnboundedThreadPoolExecutor
> ---
>
> Key: BEAM-8944
> URL: https://issues.apache.org/jira/browse/BEAM-8944
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-harness
>Affects Versions: 2.18.0
>Reporter: Yichi Zhang
>Assignee: Yichi Zhang
>Priority: Blocker
> Fix For: 2.18.0
>
> Attachments: profiling.png, profiling_one_thread.png, 
> profiling_twelve_threads.png
>
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> We are seeing a performance degradation for python streaming word count load 
> tests.
>  
> After some investigation, it appears to be caused by swapping the original 
> ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is 
> that python performance is worse with more threads on cpu-bounded tasks.
>  
> A simple test for comparing the multiple thread pool executor performance:
>  
> {code:python}
> def test_performance(self):
>    def run_perf(executor):
>      total_number = 100
>      q = queue.Queue()
>     def task(number):
>        hash(number)
>        q.put(number + 200)
>        return number
>     t = time.time()
>      count = 0
>      for i in range(200):
>        q.put(i)
>     while count < total_number:
>        executor.submit(task, q.get(block=True))
>        count += 1
>      print('%s uses %s' % (executor, time.time() - t))
>    with UnboundedThreadPoolExecutor() as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=1) as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=12) as executor:
>      run_perf(executor)
> {code}
> Results:
>  0x7fab400dbe50> uses 268.160675049
>   uses 
> 79.904583931
>   uses 
> 191.179054976
>  ```
> Profiling:
> UnboundedThreadPoolExecutor:
>  !profiling.png! 
> 1 Thread ThreadPoolExecutor:
>  !profiling_one_thread.png! 
> 12 Threads ThreadPoolExecutor:
>  !profiling_twelve_threads.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor

2019-12-19 Thread Udi Meiri (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17000313#comment-17000313
 ] 

Udi Meiri commented on BEAM-8944:
-

Does this bug affect current production runners (such as Dataflow runner 
mention in the TODO in #10387)?

> Python SDK harness performance degradation with UnboundedThreadPoolExecutor
> ---
>
> Key: BEAM-8944
> URL: https://issues.apache.org/jira/browse/BEAM-8944
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-harness
>Affects Versions: 2.18.0
>Reporter: Yichi Zhang
>Assignee: Yichi Zhang
>Priority: Blocker
> Fix For: 2.18.0
>
> Attachments: profiling.png, profiling_one_thread.png, 
> profiling_twelve_threads.png
>
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> We are seeing a performance degradation for python streaming word count load 
> tests.
>  
> After some investigation, it appears to be caused by swapping the original 
> ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is 
> that python performance is worse with more threads on cpu-bounded tasks.
>  
> A simple test for comparing the multiple thread pool executor performance:
>  
> {code:python}
> def test_performance(self):
>    def run_perf(executor):
>      total_number = 100
>      q = queue.Queue()
>     def task(number):
>        hash(number)
>        q.put(number + 200)
>        return number
>     t = time.time()
>      count = 0
>      for i in range(200):
>        q.put(i)
>     while count < total_number:
>        executor.submit(task, q.get(block=True))
>        count += 1
>      print('%s uses %s' % (executor, time.time() - t))
>    with UnboundedThreadPoolExecutor() as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=1) as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=12) as executor:
>      run_perf(executor)
> {code}
> Results:
>  0x7fab400dbe50> uses 268.160675049
>   uses 
> 79.904583931
>   uses 
> 191.179054976
>  ```
> Profiling:
> UnboundedThreadPoolExecutor:
>  !profiling.png! 
> 1 Thread ThreadPoolExecutor:
>  !profiling_one_thread.png! 
> 12 Threads ThreadPoolExecutor:
>  !profiling_twelve_threads.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor

2019-12-11 Thread Yichi Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16994104#comment-16994104
 ] 

Yichi Zhang commented on BEAM-8944:
---

Seems like the original change was to solve deadlock and stuckness issue.

While the usage of UnboundedThreadPoolExecutor does seem to impact pipelines 
that saturate cpu usage (~90%) quite a bit, it has less effect on under loaded 
pipelines.

> Python SDK harness performance degradation with UnboundedThreadPoolExecutor
> ---
>
> Key: BEAM-8944
> URL: https://issues.apache.org/jira/browse/BEAM-8944
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-harness
>Affects Versions: 2.18.0
>Reporter: Yichi Zhang
>Priority: Major
> Attachments: profiling.png, profiling_one_thread.png, 
> profiling_twelve_threads.png
>
>
> We are seeing a performance degradation for python streaming word count load 
> tests.
>  
> After some investigation, it appears to be caused by swapping the original 
> ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is 
> that python performance is worse with more threads on cpu-bounded tasks.
>  
> A simple test for comparing the multiple thread pool executor performance:
>  
> {code:python}
> def test_performance(self):
>    def run_perf(executor):
>      total_number = 100
>      q = queue.Queue()
>     def task(number):
>        hash(number)
>        q.put(number + 200)
>        return number
>     t = time.time()
>      count = 0
>      for i in range(200):
>        q.put(i)
>     while count < total_number:
>        executor.submit(task, q.get(block=True))
>        count += 1
>      print('%s uses %s' % (executor, time.time() - t))
>    with UnboundedThreadPoolExecutor() as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=1) as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=12) as executor:
>      run_perf(executor)
> {code}
> Results:
>  0x7fab400dbe50> uses 268.160675049
>   uses 
> 79.904583931
>   uses 
> 191.179054976
>  ```
> Profiling:
> UnboundedThreadPoolExecutor:
>  !profiling.png! 
> 1 Thread ThreadPoolExecutor:
>  !profiling_one_thread.png! 
> 12 Threads ThreadPoolExecutor:
>  !profiling_twelve_threads.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor

2019-12-10 Thread Yichi Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16993141#comment-16993141
 ] 

Yichi Zhang commented on BEAM-8944:
---

CC: [~angoenka]

> Python SDK harness performance degradation with UnboundedThreadPoolExecutor
> ---
>
> Key: BEAM-8944
> URL: https://issues.apache.org/jira/browse/BEAM-8944
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-harness
>Affects Versions: 2.17.0, 2.18.0
>Reporter: Yichi Zhang
>Priority: Major
>
> We are seeing a performance degradation for python streaming word count load 
> tests.
>  
> After some investigation, it appears to be caused by swapping the original 
> ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is 
> that python performance is worse with more threads on cpu-bounded tasks.
>  
> A simple test for comparing the multiple thread pool executor performance:
>  
> def test_performance(self):
>    def run_perf(executor):
>      total_number = 100
>      q = queue.Queue()
>     def task(number):
>        hash(number)
>        q.put(number + 200)
>        return number
>     t = time.time()
>      count = 0
>      for i in range(200):
>        q.put\(i\)
>     while count < total_number:
>        executor.submit(task, q.get(block=True))
>        count += 1
>      print('%s uses %s' % (executor, time.time() - t))
>   with UnboundedThreadPoolExecutor() as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=1) as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=12) as executor:
>      run_perf(executor)
> Results:
>  0x7fab400dbe50> uses 268.160675049
>   uses 
> 79.904583931
>   uses 
> 191.179054976
>  ```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)