[jira] [Updated] (AIRFLOW-4401) multiprocessing.Queue.empty() is unreliable

2019-06-19 Thread Ash Berlin-Taylor (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ash Berlin-Taylor updated AIRFLOW-4401:
---
Fix Version/s: (was: 2.0.0)

> multiprocessing.Queue.empty() is unreliable
> ---
>
> Key: AIRFLOW-4401
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4401
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Jarek Potiuk
>Priority: Major
> Fix For: 1.10.4
>
>
> After discussing with [~ash] and [~BasPH] potential reasons for flakiness of 
> LocalExecutor tests (documented for example in AIRFLOW-4382), I took a deeper 
> dive into the problem and what I found raised the remaining hair on top of my 
> head. 
> We had a number of flaky tests in the local executor that resulted in 
> result_queue not being empty where it should have been emptied a moment 
> before. More details and discussion can be found in 
> [https://github.com/apache/airflow/pull/5159] 
> The problem turned out to be  unreliability of multiprocessing.Queue empty() 
> implementation. It turned out that multiprocessing.Queue.empty() 
> implementation is not fully synchronized and it might return True even if 
> put() operation has been already completed in another process. What's more - 
> empty() might return True even if qsize() of the queue returns > 0 (!) 
> It's a bit mind-boggling but it is "as intended' as documented in 
> [https://bugs.python.org/issue23582]  (resolved as "not a bug" ) and it 
> is described in 
> [https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues] 
> when you go details of how data is synchronized between processes.
> A few people have stumbled upon this problem. For example 
> [https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
>  and [https://github.com/keras-team/autokeras/issues/368] 
> Also we seemed to experienced that in Airflow before. In jobs.py years ago 
> (31.07.2016) - we can see the comment below (but we used 
> multiprocessing.queue empty() nevertheless):
> {code:java}
> # Not using multiprocessing.Queue() since it's no longer a separate
> # process and due to some unusual behavior. (empty() incorrectly
> # returns true?){code}
> The solution available in [https://bugs.python.org/issue23582]  using qsize() 
> was working on Linux but is not really acceptable because qsize() does not 
> work on MacOS (throws NotImplementedError).
>  
> *Proposed solution 1) Synchronized Queue*
> [https://github.com/apache/airflow/pull/5199]
> Implement a more reliable queue (SynchronizedQueue) based on 
> [https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
>  (but we have to addjust initialisation to match 2.7 and 3.5+ syntax (since 
> we want to backport to stable v1.10 release).
> We should replace all usages of multiprocessing.Queue where empty() is used 
> with the SynchronizedQueue. And make sure we do not use multiprocessing.Queue 
> in similar way in the future.
> Pros:
>  * rather straightforward replacement of queue -> SynchronizedQueue
>  * no extra processes needed - queues continue to be distributed without 
> central manager
>  * no need to cleanup the processes
> Cons:
>  * potential synchronization delays (likely negligible)
>  * we are adding our own SynchronizedQueue with slightly altered behaviour - 
> more code to manage
>  * the SynchronizedQueue implementation is still not fully reliable - you can 
> have cases where empty() returns False but get_no_wait() raises Empty 
> exception This means that everywhere we depend on non empty() we have to use 
> potentially blocking get() to retrieve data
>  * Requires (but simple) backporting to python 2 for v1-10 branch
> *Proposed solution 2): Use managed queues*
> [https://github.com/apache/airflow/pull/5200]
> Seems that this unreliable behaviour of Queue is only happening if the Queue 
> is instantiated directly and the small delays between processes are gone when 
> Shared Manager is used. In such case Queue is really a proxy to a central 
> Queue object started in a separate process - thus synchronisation is 
> implemented fully via this single central queue: 
> [https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues] . 
> Using Managed queues should solve the problem.
> Observation from tests confirms that this is the case and the tests are not 
> flaky any more when managed queues are used.
> Pros:
>  * Only initialisation of queues needs to be changed - no need to extend 
> Queue implementation
>  * Pythonic way - managers are part of standard library and we can assume 
> they are reliable and tested
>  * Such managed queue is fully reliable - empty() and get_no_wait() are 
> perfectly in sync.
>  * Works the same for python 2/python 3
> Cons:
>  * potential synchronization delays (likely 

[jira] [Updated] (AIRFLOW-4401) multiprocessing.Queue.empty() is unreliable

2019-04-29 Thread Jarek Potiuk (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jarek Potiuk updated AIRFLOW-4401:
--
Description: 
After discussing with [~ash] and [~BasPH] potential reasons for flakiness of 
LocalExecutor tests (documented for example in AIRFLOW-4382), I took a deeper 
dive into the problem and what I found raised the remaining hair on top of my 
head. 

We had a number of flaky tests in the local executor that resulted in 
result_queue not being empty where it should have been emptied a moment before. 
More details and discussion can be found in 
[https://github.com/apache/airflow/pull/5159] 

The problem turned out to be  unreliability of multiprocessing.Queue empty() 
implementation. It turned out that multiprocessing.Queue.empty() implementation 
is not fully synchronized and it might return True even if put() operation has 
been already completed in another process. What's more - empty() might return 
True even if qsize() of the queue returns > 0 (!) 

It's a bit mind-boggling but it is "as intended' as documented in 
[https://bugs.python.org/issue23582]  (resolved as "not a bug" ) and it is 
described in 
[https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues] when 
you go details of how data is synchronized between processes.

A few people have stumbled upon this problem. For example 
[https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
 and [https://github.com/keras-team/autokeras/issues/368] 

Also we seemed to experienced that in Airflow before. In jobs.py years ago 
(31.07.2016) - we can see the comment below (but we used multiprocessing.queue 
empty() nevertheless):
{code:java}
# Not using multiprocessing.Queue() since it's no longer a separate
# process and due to some unusual behavior. (empty() incorrectly
# returns true?){code}
The solution available in [https://bugs.python.org/issue23582]  using qsize() 
was working on Linux but is not really acceptable because qsize() does not work 
on MacOS (throws NotImplementedError).

 

*Proposed solution 1) Synchronized Queue*

[https://github.com/apache/airflow/pull/5199]

Implement a more reliable queue (SynchronizedQueue) based on 
[https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
 (but we have to addjust initialisation to match 2.7 and 3.5+ syntax (since we 
want to backport to stable v1.10 release).

We should replace all usages of multiprocessing.Queue where empty() is used 
with the SynchronizedQueue. And make sure we do not use multiprocessing.Queue 
in similar way in the future.

Pros:
 * rather straightforward replacement of queue -> SynchronizedQueue
 * no extra processes needed - queues continue to be distributed without 
central manager
 * no need to cleanup the processes

Cons:
 * potential synchronization delays (likely negligible)
 * we are adding our own SynchronizedQueue with slightly altered behaviour - 
more code to manage
 * the SynchronizedQueue implementation is still not fully reliable - you can 
have cases where empty() returns False but get_no_wait() raises Empty exception 
This means that everywhere we depend on non empty() we have to use potentially 
blocking get() to retrieve data
 * Requires (but simple) backporting to python 2 for v1-10 branch

*Proposed solution 2): Use managed queues*

[https://github.com/apache/airflow/pull/5200]

Seems that this unreliable behaviour of Queue is only happening if the Queue is 
instantiated directly and the small delays between processes are gone when 
Shared Manager is used. In such case Queue is really a proxy to a central Queue 
object started in a separate process - thus synchronisation is implemented 
fully via this single central queue: 
[https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues] . 
Using Managed queues should solve the problem.

Observation from tests confirms that this is the case and the tests are not 
flaky any more when managed queues are used.

Pros:
 * Only initialisation of queues needs to be changed - no need to extend Queue 
implementation
 * Pythonic way - managers are part of standard library and we can assume they 
are reliable and tested
 * Such managed queue is fully reliable - empty() and get_no_wait() are 
perfectly in sync.
 * Works the same for python 2/python 3

Cons:
 * potential synchronization delays (likely negligible)
 * since we have a separate process started for each manager, cleanup is 
necessary and it is quite delicate, because shutting down the manager prevents 
from accessing the queue (Broken Pipe errors). Therefore sequence of cleanup is 
important - to first process everything and clean-up later. This might have 
some undesirable side effects when shutting down Schedulers/Workers

  was:
After discussing with [~ash] and [~BasPH] potential reasons for flakiness of 
LocalExecutor tests (documented for example in AIRFLOW-4382), 

[jira] [Updated] (AIRFLOW-4401) multiprocessing.Queue.empty() is unreliable

2019-04-29 Thread Jarek Potiuk (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jarek Potiuk updated AIRFLOW-4401:
--
Description: 
After discussing with [~ash] and [~BasPH] potential reasons for flakiness of 
LocalExecutor tests (documented for example in AIRFLOW-4382), I took a deeper 
dive into the problem and what I found raised the remaining hair on top of my 
head. 

We had a number of flaky tests in the local executor that resulted in 
result_queue not being empty where it should have been emptied a moment before. 
More details and discussion can be found in 
[https://github.com/apache/airflow/pull/5159] 

The problem turned out to be  unreliability of multiprocessing.Queue empty() 
implementation. It turned out that multiprocessing.Queue.empty() implementation 
is not fully synchronized and it might return True even if put() operation has 
been already completed in another process. What's more - empty() might return 
True even if qsize() of the queue returns > 0 (!) 

It's a bit mind-boggling but it is "as intended' as documented in 
[https://bugs.python.org/issue23582]  (resolved as "not a bug" ) and it is 
described in 
[https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues] when 
you go details of how data is synchronized between processes.

A few people have stumbled upon this problem. For example 
[https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
 and [https://github.com/keras-team/autokeras/issues/368] 

Also we seemed to experienced that in Airflow before. In jobs.py years ago 
(31.07.2016) - we can see the comment below (but we used multiprocessing.queue 
empty() nevertheless):
{code:java}
# Not using multiprocessing.Queue() since it's no longer a separate
# process and due to some unusual behavior. (empty() incorrectly
# returns true?){code}
The solution available in [https://bugs.python.org/issue23582]  using qsize() 
was working on Linux but is not really acceptable because qsize() does not work 
on MacOS (throws NotImplementedError).

 

*Proposed solution 1) Synchronized Queue*

[https://github.com/apache/airflow/pull/5199]

Implement a more reliable queue (SynchronizedQueue) based on 
[https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
 (but we have to addjust initialisation to match 2.7 and 3.5+ syntax (since we 
want to backport to stable v1.10 release).

We should replace all usages of multiprocessing.Queue where empty() is used 
with the SynchronizedQueue. And make sure we do not use multiprocessing.Queue 
in similar way in the future.

Pros:
 * rather straightforward replacement of queue -> SynchronizedQueue
 * no extra processes needed - queues continue to be distributed without 
central manager
 * no need to cleanup the processes

Cons:
 * potential synchronization delays (likely negligible)
 * we are adding our own SynchronizedQueue with slightly altered behaviour - 
more code to manage
 * the SynchronizedQueue implementation is still not fully reliable - you can 
have cases where empty() returns False but get_no_wait() raises Empty exception 
This means that everywhere we depend on non empty() we have to use potentially 
blocking get() to retrieve data

*Proposed solution 2): Use managed queues*

[https://github.com/apache/airflow/pull/5200]

Seems that this unreliable behaviour of Queue is only happening if the Queue is 
instantiated directly and the small delays between processes are gone when 
Shared Manager is used. In such case Queue is really a proxy to a central Queue 
object started in a separate process - thus synchronisation is implemented 
fully via this single central queue: 
[https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues] . 
Using Managed queues should solve the problem.

Observation from tests confirms that this is the case and the tests are not 
flaky any more when managed queues are used.

Pros:
 * Only initialisation of queues needs to be changed - no need to extend Queue 
implementation
 * Pythonic way - managers are part of standard library and we can assume they 
are reliable and tested
 * Such managed queue is fully reliable - empty() and get_no_wait() are 
perfectly in sync.

Cons:
 * potential synchronization delays (likely negligible)
 * since we have a separate process started for each manager, cleanup is 
necessary and it is quite delicate, because shutting down the manager prevents 
from accessing the queue (Broken Pipe errors). Therefore sequence of cleanup is 
important - to first process everything and clean-up later. This might have 
some undesirable side effects when shutting down Schedulers/Workers

  was:
After discussing with [~ash] and [~BasPH] potential reasons for flakiness of 
LocalExecutor tests, I took a deeper dive into the problem and what I found 
raised the remaining hair on top of my head. 

We had a number of flaky tests in the local 

[jira] [Updated] (AIRFLOW-4401) multiprocessing.Queue.empty() is unreliable

2019-04-28 Thread Jarek Potiuk (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jarek Potiuk updated AIRFLOW-4401:
--
Description: 
After discussing with [~ash] and [~BasPH] potential reasons for flakiness of 
LocalExecutor tests, I took a deeper dive into the problem and what I found 
raised the remaining hair on top of my head. 

We had a number of flaky tests in the local executor that resulted in 
result_queue not being empty where it should have been emptied a moment before. 
More details and discussion can be found in 
[https://github.com/apache/airflow/pull/5159] 

The problem turned out to be  unreliability of multiprocessing.Queue empty() 
implementation. It turned out that multiprocessing.Queue.empty() implementation 
is not fully synchronized and it might return True even if put() operation has 
been already completed in another process. What's more - empty() might return 
True even if qsize() of the queue returns > 0 (!) 

It's a bit mind-boggling but it is "as intended' as documented in 
[https://bugs.python.org/issue23582]  (resolved as "not a bug" ) and it is 
described in 
[https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues] when 
you go details of how data is synchronized between processes.

A few people have stumbled upon this problem. For example 
[https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
 and [https://github.com/keras-team/autokeras/issues/368] 

Also we seemed to experienced that in Airflow before. In jobs.py years ago 
(31.07.2016) - we can see the comment below (but we used multiprocessing.queue 
empty() nevertheless):
{code:java}
# Not using multiprocessing.Queue() since it's no longer a separate
# process and due to some unusual behavior. (empty() incorrectly
# returns true?){code}
The solution available in [https://bugs.python.org/issue23582]  using qsize() 
was working on Linux but is not really acceptable because qsize() does not work 
on MacOS (throws NotImplementedError).

 

The proposed solution 1): 

Implement a more reliable queue (SynchronizedQueue) based on 
[https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
 (but we have to addjust initialisation to match 2.7 and 3.5+ syntax (since we 
want to backport to stable v1.10 releasE).

We should replace all usages of multiprocessing.Queue where empty() is used 
with the SynchronizedQueue. And make sure we do not use multiprocessing.Queue 
in similar way in the future.

The solution 2)

Seems that this unreliable behaviour of Queue is only happening if the Queue is 
instantiated directly and the small delays between processes are gone when 
Shared Manager is used (because then Queue is a proxy to a central Queue 
started in a separate process - thus synchronisation is implemented in this 
single queue: 
[https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues] . 
Switching to Managed queue should solve the problem as well.

 

 

  was:
After discussing with [~ash] and [~BasPH] potential reasons for flakiness of 
LocalExecutor tests, I took a deeper dive into the problem and what I found 
raised the remaining hair on top of my head. 

We had a number of flaky tests in the local executor that resulted in 
result_queue not being empty where it should have been emptied a moment before. 
More details and discussion can be found in 
[https://github.com/apache/airflow/pull/5159] 

The problem turned out to be ... unreliability of multiprocessing.Queue empty() 
implementation. It turned out that multiprocessing.Queue.empty() implementation 
is not fully synchronized and it might return True even if put() operation has 
been already completed in another process. What's more - empty() might return 
True even if qsize() of the queue returns > 0 (!) 

It's a bit mind-boggling but it is "as intended' as documented in 
[https://bugs.python.org/issue23582]  (resolved as "not a bug" ) 

A few people have stumbled upon this problem. For example 
[https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
 and [https://github.com/keras-team/autokeras/issues/368] 

Also we seemed to experienced that in Airflow before. In jobs.py years ago 
(31.07.2016) - we can see the comment below (but we used multiprocessing.queue 
empty() nevertheless):
{code:java}
# Not using multiprocessing.Queue() since it's no longer a separate
# process and due to some unusual behavior. (empty() incorrectly
# returns true?){code}
The solution available in [https://bugs.python.org/issue23582]  using qsize() 
was working on Linux but is not really acceptable because qsize() does not work 
on MacOS (throws NotImplementedError).

The solution 1)

Implement a reliable queue (SynchronizedQueue) based on 
[https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
 (butwith a twist that __init__ of class deriving from Queue has 

[jira] [Updated] (AIRFLOW-4401) multiprocessing.Queue.empty() is unreliable

2019-04-24 Thread Ash Berlin-Taylor (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ash Berlin-Taylor updated AIRFLOW-4401:
---
Fix Version/s: (was: 2.0.0)
   1.10.4

[~higrys] I plan on pulling this in to 1.10.4 anyway :D 

> multiprocessing.Queue.empty() is unreliable
> ---
>
> Key: AIRFLOW-4401
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4401
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Jarek Potiuk
>Priority: Major
> Fix For: 1.10.4
>
>
> After discussing with [~ash] and [~BasPH] potential reasons for flakiness of 
> LocalExecutor tests, I took a deeper dive into the problem and what I found 
> raised the remaining hair on top of my head. 
> We had a number of flaky tests in the local executor that resulted in 
> result_queue not being empty where it should have been emptied a moment 
> before. More details and discussion can be found in 
> [https://github.com/apache/airflow/pull/5159] 
> The problem turned out to be ... unreliability of multiprocessing.Queue 
> empty() implementation. It turned out that multiprocessing.Queue.empty() 
> implementation is not fully synchronized and it might return True even if 
> put() operation has been already completed in another process. What's more - 
> empty() might return True even if qsize() of the queue returns > 0 (!) 
> It's a bit mind-boggling but it is "as intended' as documented in 
> [https://bugs.python.org/issue23582]  (resolved as "not a bug" ) 
> A few people have stumbled upon this problem. For example 
> [https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
>  and [https://github.com/keras-team/autokeras/issues/368] 
> Also we seemed to experienced that in Airflow before. In jobs.py years ago 
> (31.07.2016) - we can see the comment below (but we used 
> multiprocessing.queue empty() nevertheless):
> {code:java}
> # Not using multiprocessing.Queue() since it's no longer a separate
> # process and due to some unusual behavior. (empty() incorrectly
> # returns true?){code}
> The solution available in [https://bugs.python.org/issue23582]  using qsize() 
> was working on Linux but is not really acceptable because qsize() does not 
> work on MacOS (throws NotImplementedError).
> The working solution is to implement a reliable queue (SynchronizedQueue) 
> based on 
> [https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
>  (butwith a twist that __init__ of class deriving from Queue has to be 
> changed for python 3.4+ as described in 
> [https://stackoverflow.com/questions/24941359/ctx-parameter-in-multiprocessing-queue].
> Luckily we are now Python3.5+
> We should replace all usages of multiprocessing.Queue where empty() is used 
> with the SynchronizedQueue. And make sure we do not use multiprocessing.Queue 
> in similar way in the future.
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (AIRFLOW-4401) multiprocessing.Queue.empty() is unreliable

2019-04-23 Thread Jarek Potiuk (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jarek Potiuk updated AIRFLOW-4401:
--
Description: 
After discussing with [~ash] and [~BasPH] potential reasons for flakiness of 
LocalExecutor tests, I took a deeper dive into the problem and what I found 
raised the remaining hair on top of my head. 

We had a number of flaky tests in the local executor that resulted in 
result_queue not being empty where it should have been emptied a moment before. 
More details and discussion can be found in 
[https://github.com/apache/airflow/pull/5159] 

The problem turned out to be ... unreliability of multiprocessing.Queue empty() 
implementation. It turned out that multiprocessing.Queue.empty() implementation 
is not fully synchronized and it might return True even if put() operation has 
been already completed in another process. What's more - empty() might return 
True even if qsize() of the queue returns > 0 (!) 

It's a bit mind-boggling but it is "as intended' as documented in 
[https://bugs.python.org/issue23582]  (resolved as "not a bug" ) 

A few people have stumbled upon this problem. For example 
[https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
 and [https://github.com/keras-team/autokeras/issues/368] 

Also we seemed to experienced that in Airflow before. In jobs.py years ago 
(31.07.2016) - we can see the comment below (but we used multiprocessing.queue 
empty() nevertheless):
{code:java}
# Not using multiprocessing.Queue() since it's no longer a separate
# process and due to some unusual behavior. (empty() incorrectly
# returns true?){code}
The solution available in [https://bugs.python.org/issue23582]  using qsize() 
was working on Linux but is not really acceptable because qsize() does not work 
on MacOS (throws NotImplementedError).

The working solution is to implement a reliable queue (SynchronizedQueue) based 
on 
[https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
 (butwith a twist that __init__ of class deriving from Queue has to be changed 
for python 3.4+ as described in 
[https://stackoverflow.com/questions/24941359/ctx-parameter-in-multiprocessing-queue].

Luckily we are now Python3.5+

We should replace all usages of multiprocessing.Queue where empty() is used 
with the SynchronizedQueue. And make sure we do not use multiprocessing.Queue 
in similar way in the future.

 

 

 

  was:
After discussing with [~ash] and [~BasPH] potential reasons for flakiness of 
LocalExecutor tests, I took a deeper dive into the problem and what I found 
raised the remaining hair on top of my head. 

We had a number of flaky tests in the local executor that resulted in 
result_queue not being empty where it should have been emptied a moment before. 
More details and discussion can be found in 
[https://github.com/apache/airflow/pull/5159] 

The problem turned out to be ... unreliability of multiprocessing.Queue empty() 
implementation. It turned out that multiprocessing.Queue.empty() implementation 
is not fully synchronized and it might return True even if put() operation has 
been already completed in another process. What's more - empty() might return 
True even if qsize() of the queue returns > 0 (!) 

It's a bit mind-boggling but it is "as intended' as documented in 
[https://bugs.python.org/issue23582]  (resolved as "not a bug" ) 

A few people have stumbled upon this problem. For example 
[https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
 and [https://github.com/keras-team/autokeras/issues/368] 

Also we seemed to experienced that in Airflow before. In jobs.py years ago 
(31.07.2016) - we can see the comment below (but we used multiprocessing.queue 
empty() nevertheless):
{code:java}
# Not using multiprocessing.Queue() since it's no longer a separate
# process and due to some unusual behavior. (empty() incorrectly
# returns true?){code}
The solution available in [https://bugs.python.org/issue23582]  using qsize() 
was not usable because qsize() does not work on MacOS (throws 
NotImplementedError).

The working solution is to implement a reliable queue (SynchronizedQueue) based 
on 
[https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
 (butwith a twist that __init__ of class deriving from Queue has to be changed 
for python 3.4+ as described in 
[https://stackoverflow.com/questions/24941359/ctx-parameter-in-multiprocessing-queue].

Luckily we are now Python3.5+

We should replace all usages of multiprocessing.Queue where empty() is used 
with the SynchronizedQueue. And make sure we do not use multiprocessing.Queue 
in similar way in the future.

 

 

 


> multiprocessing.Queue.empty() is unreliable
> ---
>
> Key: AIRFLOW-4401
> URL: 

[jira] [Updated] (AIRFLOW-4401) multiprocessing.Queue.empty() is unreliable

2019-04-23 Thread Jarek Potiuk (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jarek Potiuk updated AIRFLOW-4401:
--
Description: 
After discussing with [~ash] and [~BasPH] potential reasons for flakiness of 
LocalExecutor tests, I took a deeper dive into the problem and what I found 
raised the remaining hair on top of my head. 

We had a number of flaky tests in the local executor that resulted in 
result_queue not being empty where it should have been emptied a moment before. 
More details and discussion can be found in 
[https://github.com/apache/airflow/pull/5159] 

The problem turned out to be ... unreliability of multiprocessing.Queue empty() 
implementation. It turned out that multiprocessing.Queue.empty() implementation 
is not fully synchronized and it might return True even if put() operation has 
been already completed in another process. What's more - empty() might return 
True even if qsize() of the queue returns > 0 (!) 

It's a bit mind-boggling but it is "as intended' as documented in 
[https://bugs.python.org/issue23582]  (resolved as "not a bug" ) 

A few people have stumbled upon this problem. For example 
[https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
 and [https://github.com/keras-team/autokeras/issues/368] 

Also we seemed to experienced that in Airflow before. In jobs.py years ago 
(31.07.2016) - we can see the comment below (but we used multiprocessing.queue 
empty() nevertheless):
{code:java}
# Not using multiprocessing.Queue() since it's no longer a separate
# process and due to some unusual behavior. (empty() incorrectly
# returns true?){code}
The solution available in [https://bugs.python.org/issue23582]  using qsize() 
was not usable because qsize() does not work on MacOS (throws 
NotImplementedError).

The working solution is to implement a reliable queue (SynchronizedQueue) based 
on 
[https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
 (butwith a twist that __init__ of class deriving from Queue has to be changed 
for python 3.4+ as described in 
[https://stackoverflow.com/questions/24941359/ctx-parameter-in-multiprocessing-queue].

Luckily we are now Python3.5+

We should replace all usages of multiprocessing.Queue where empty() is used 
with the SynchronizedQueue. And make sure we do not use multiprocessing.Queue 
in similar way in the future.

 

 

 

  was:
After discussing with [~ash] and [~BasPH] potential reasons for flakiness of 
LocalExecutor tests, I took a deeper dive into the problem and what I found 
raised the remaining hair on top of my head. 

We had a number of flaky tests in the local executor that resulted in 
result_queue not being empty where it should have been emptied a moment before. 
More details and discussion can be found in 
[https://github.com/apache/airflow/pull/5159] 

The problem turned out to be ... unreliability of multiprocessing.Queue empty() 
implementation. It turned out that multiprocessing.Queue.empty() implementation 
is not fully synchronized and it might return True even if put() operation has 
been already completed in another process. What's more - empty() might return 
True even if qsize() of the queue returns > 0 (!) 

It's a bit mind-boggling but it is "as intended' as documented in 
[https://bugs.python.org/issue23582]  (resolved as "not a bug" ) 

A few people have stumbled upon this problem. For example 
[https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
 and [https://github.com/keras-team/autokeras/issues/368] 

Also we seemed to experienced that in Airflow before. In jobs.py year ago 
(31.07.2016) - we can see the comment below (but we used multiprocessing.queue 
empty() nevertheless):
{code:java}
# Not using multiprocessing.Queue() since it's no longer a separate
# process and due to some unusual behavior. (empty() incorrectly
# returns true?){code}
The solution available in [https://bugs.python.org/issue23582]  using qsize() 
was not usable because qsize() does not work on MacOS (throws 
NotImplementedError).

The working solution is to implement a reliable queue (SynchronizedQueue) based 
on 
[https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
 (butwith a twist that __init__ of class deriving from Queue has to be changed 
for python 3.4+ as described in 
[https://stackoverflow.com/questions/24941359/ctx-parameter-in-multiprocessing-queue].

Luckily we are now Python3.5+

We should replace all usages of multiprocessing.Queue where empty() is used 
with the SynchronizedQueue. And make sure we do not use multiprocessing.Queue 
in similar way in the future.

 

 

 


> multiprocessing.Queue.empty() is unreliable
> ---
>
> Key: AIRFLOW-4401
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4401
> Project: Apache 

[jira] [Updated] (AIRFLOW-4401) multiprocessing.Queue.empty() is unreliable

2019-04-23 Thread Jarek Potiuk (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jarek Potiuk updated AIRFLOW-4401:
--
Description: 
After discussing with [~ash] and [~BasPH] potential reasons for flakiness of 
LocalExecutor tests, I took a deeper dive into the problem and what I found 
raised the remaining hair on top of my head. 

We had a number of flaky tests in the local executor that resulted in 
result_queue not being empty where it should have been emptied a moment before. 
More details and discussion can be found in 
[https://github.com/apache/airflow/pull/5159] 

The problem turned out to be ... unreliability of multiprocessing.Queue empty() 
implementation. It turned out that multiprocessing.Queue.empty() implementation 
is not fully synchronized and it might return True even if put() operation has 
been already completed in another process. What's more - empty() might return 
True even if qsize() of the queue returns > 0 (!) 

It's a bit mind-boggling but it is "as intended' as documented in 
[https://bugs.python.org/issue23582]  (resolved as "not a bug" ) 

A few people have stumbled upon this problem. For example 
[https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
 and [https://github.com/keras-team/autokeras/issues/368] 

Also we seemed to experienced that in Airflow before. In jobs.py year ago 
(31.07.2016) - we can see the comment below (but we used multiprocessing.queue 
empty() nevertheless):
{code:java}
# Not using multiprocessing.Queue() since it's no longer a separate
# process and due to some unusual behavior. (empty() incorrectly
# returns true?){code}
The solution available in [https://bugs.python.org/issue23582]  using qsize() 
was not usable because qsize() does not work on MacOS (throws 
NotImplementedError).

The working solution is to implement a reliable queue (SynchronizedQueue) based 
on 
[https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
 (butwith a twist that __init__ of class deriving from Queue has to be changed 
for python 3.4+ as described in 
[https://stackoverflow.com/questions/24941359/ctx-parameter-in-multiprocessing-queue].

Luckily we are now Python3.5+

We should replace all usages of multiprocessing.Queue where empty() is used 
with the SynchronizedQueue. And make sure we do not use multiprocessing.Queue 
in similar way in the future.

 

 

 

  was:
After discussing with [~ash] and [~BasPH] potential reasons for flakiness of 
LocalExecutor tests, I took a deeper dive into the problem and what I found 
raised the remaining hair on top of my head. 

We had a number of flaky tests in the local executor that resulted in 
result_queue not being empty where it should have been emptied a moment before. 
More details and discussion can be found in 
[https://github.com/apache/airflow/pull/5159] 

The problem turned out to be ... unreliability of multiprocessing.Queue empty() 
implementation. Itt turned out that multiprocessing.Queue.empty() 
implementation is not fully synchronized and it might return True even if put() 
operation has been already completed in another thread. What's more - empty() 
might return True even if qsize() of the queue returns > 0 (!) 

It's a bit mind-boggling but it is "as intended' as documented in 
[https://bugs.python.org/issue23582]  (resolved as "not a bug" ) 

A few people have stumbled upon this problem. For example 
[https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
 and [https://github.com/keras-team/autokeras/issues/368] 

Also we seemed to experienced that in Airflow before. In jobs.py year ago 
(31.07.2016) - we can see the comment below (but we used multiprocessing.queue 
empty() nevertheless):
{code:java}
# Not using multiprocessing.Queue() since it's no longer a separate
# process and due to some unusual behavior. (empty() incorrectly
# returns true?){code}
The solution available in [https://bugs.python.org/issue23582]  using qsize() 
was not usable because qsize() does not work on MacOS (throws 
NotImplementedError).

The working solution is to implement a reliable queue (SynchronizedQueue) based 
on 
[https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
 (butwith a twist that __init__ of class deriving from Queue has to be changed 
for python 3.4+ as described in 
[https://stackoverflow.com/questions/24941359/ctx-parameter-in-multiprocessing-queue].

Luckily we are now Python3.5+

We should replace all usages of multiprocessing.Queue where empty() is used 
with the SynchronizedQueue. And make sure we do not use multiprocessing.Queue 
in similar way in the future.

 

 

 


> multiprocessing.Queue.empty() is unreliable
> ---
>
> Key: AIRFLOW-4401
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4401
> Project: Apache 

[jira] [Updated] (AIRFLOW-4401) multiprocessing.Queue.empty() is unreliable

2019-04-23 Thread Jarek Potiuk (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jarek Potiuk updated AIRFLOW-4401:
--
Summary: multiprocessing.Queue.empty() is unreliable  (was: 
multiprocessing.Queue is unreliable)

> multiprocessing.Queue.empty() is unreliable
> ---
>
> Key: AIRFLOW-4401
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4401
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Jarek Potiuk
>Priority: Major
>
> After discussing with [~ash] and [~BasPH] potential reasons for flakiness of 
> LocalExecutor tests, I took a deeper dive into the problem and what I found 
> raised the remaining hair on top of my head. 
> We had a number of flaky tests in the local executor that resulted in 
> result_queue not being empty where it should have been emptied a moment 
> before. More details and discussion can be found in 
> [https://github.com/apache/airflow/pull/5159] 
> The problem turned out to be ... unreliability of multiprocessing.Queue 
> empty() implementation. Itt turned out that multiprocessing.Queue.empty() 
> implementation is not fully synchronized and it might return True even if 
> put() operation has been already completed in another thread. What's more - 
> empty() might return True even if qsize() of the queue returns > 0 (!) 
> It's a bit mind-boggling but it is "as intended' as documented in 
> [https://bugs.python.org/issue23582]  (resolved as "not a bug" ) 
> A few people have stumbled upon this problem. For example 
> [https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
>  and [https://github.com/keras-team/autokeras/issues/368] 
> Also we seemed to experienced that in Airflow before. In jobs.py year ago 
> (31.07.2016) - we can see the comment below (but we used 
> multiprocessing.queue empty() nevertheless):
> {code:java}
> # Not using multiprocessing.Queue() since it's no longer a separate
> # process and due to some unusual behavior. (empty() incorrectly
> # returns true?){code}
> The solution available in [https://bugs.python.org/issue23582]  using qsize() 
> was not usable because qsize() does not work on MacOS (throws 
> NotImplementedError).
> The working solution is to implement a reliable queue (SynchronizedQueue) 
> based on 
> [https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
>  (butwith a twist that __init__ of class deriving from Queue has to be 
> changed for python 3.4+ as described in 
> [https://stackoverflow.com/questions/24941359/ctx-parameter-in-multiprocessing-queue].
> Luckily we are now Python3.5+
> We should replace all usages of multiprocessing.Queue where empty() is used 
> with the SynchronizedQueue. And make sure we do not use multiprocessing.Queue 
> in similar way in the future.
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)