I took your advice and I was able to rewrite the code so I don't need to 
use "wait()" or canceling tasks. 

Instead I am using PriorityQueue for items and poison pills, JoinableQueue 
for tokens that represent items in progress, and a BoundedSemaphore to 
bound the number of items in the queue while keeping poison pills 
unbounded. (One may see the changes at 
https://github.com/chfoo/wpull/commit/28493a2ca4ba07688a62e36f5d88855c60338f7c
)

So the good news is that I don't think I will ever need to use 
wait()/wait_for() for queues. I think it would be good idea that the 
documentation should mention that wait()/wait_for() doesn't provide the 
usual guarantees for synchronization primitives and code doing so should be 
rewritten.

On Tuesday, August 19, 2014 9:45:32 PM UTC-4, Guido van Rossum wrote:
>
> Note: I haven't looked at the Tulip code yet (and won't have time until 
> next week -- I'm on a much-needed vacation).
>
> It looks possible that when you call .get() on a Queue wrapped in a 
> timeout (e.g. using wait_for()) you could see a timeout while the Future 
> representing the result is still going to get its result. This is because 
> wait_for() doesn't have very strong guarantees. In particular cancelling a 
> Task takes effect after some delay because the generator wrapped by the 
> Task must be given a chance to catch the cancellation with a try/except or 
> try/finally.
>
> Unfortunately it's hard to explain exactly how tasks are scheduled in 
> fewer words than the implementation -- the only easy property that as long 
> as you don't explicitly play with threads, task switches only happen at 
> "yield from" points -- but that doesn't help much because most operations 
> are implemented using a multitude of tasks. Also it looks like you're using 
> Trollius, and I don't actually understand the details of how things work 
> there.
>
> All in all it is still quite possible that there's a bug in your code, but 
> it feels likely that this is a race in our Queue API and we will have to 
> add a timeout to the Queue.get() and Queue.put() APIs to fix it. A third 
> possible outcome is that there may be a way to write your code that avoids 
> the race, and we may decide that we should just document that, because I 
> really don't want to have to add timeout parameters to every method. But if 
> Queues really are special enough we'll do it!
>
> Anyway, it would be well worth your while if you could reproduce this in a 
> simple test case (not using the rest of your own code) and file a bug (I 
> prefer bugs in the tulip project, but asyncio bugs filed in the Python 
> tracker will also reach me).
>
>
>
>
> On Tue, Aug 19, 2014 at 6:09 PM, Christopher Foo <[email protected] 
> <javascript:>> wrote:
>
>> TL;DR: How do I use timeouts with "Queue.get()"?
>>
>> I am currently porting my web crawler from Tornado to Asyncio using 
>> Trollius. 
>>
>> I seem to have noticed that using ".wait()" or ".wait_for()" on 
>> "Queue.get()" will sometimes lose items. I assumed that since "Queue.get()" 
>> does not have a timeout parameter, "wait_for()" should be used.
>>
>> I first noticed that my unit tests would occasionally hang. In this 
>> situation, I am using a queue to send RPC messages to PhantomJS via stdin 
>> and stdout. I have a coroutine that is run as ".async()" which has a while 
>> loop that consumes the queue and sends it off to PhantomJS. Since PhantomJS 
>> blocks on "readline()", I decided to timeout every 0.1 seconds. 
>>
>> Occasionally, a unit test would fail but the logs for PhantomJS say it's 
>> reading things every 0.1 seconds, but it never received a specific RPC 
>> message. I switched to "Queue.get_nowait()" and ".sleep()" (so no more 
>> ".wait_for()") and that seems to solved the problem. You can see the 
>> changes at: 
>> https://github.com/chfoo/wpull/commit/5d52a144ec6a00e66b9409ec1524d4f87eeda4bb
>>
>> Now I ran into another situation where I am losing items. I have another 
>> producer-consumer situation where the number of consumers may change and 
>> consumers are implicitly producing items. In this case, I decided to 
>> implement two queues: one for items (JoinableQueue) and one for poison 
>> pills (Queue). Each consumer will ".wait()" with "FIRST_COMPLETED" on both 
>> queues using "Queue.get()". If the consumer gets one item, either 
>> process the item or die. The other task is cancelled. If two items, do the 
>> item first and then die. In both cases after processing the item, 
>> "JoinableQueue.task_done()" is called. 
>>
>> However, I seem to be losing items since the producer occasionally calls 
>> "JoinableQueue.join()" when it shouldn't. When there are no more items from 
>> the source, it checks if any consumers are alive so it can restart 
>> producing just in case the consumers implicitly produce more items. But 
>> this hangs forever as an item was lost and the consumers never seen the 
>> item. So the number of items not done is still above 0. You can see the 
>> code at 
>> https://github.com/chfoo/wpull/blob/d255199506d69816fcd39f3eb950bdc106e510ae/wpull/engine.py#L100
>> .
>>
>> Am I misunderstanding the point of ".wait()"/".wait_for()"? What does a 
>> task have to do when it gets cancelled? Should I assume that all tasks are 
>> scheduled and running during ".wait()"? Am I supposed to assume that any 
>> cancelled task is a task that has completed but we are discarding its 
>> results? Why doesn't "Queue.get()" offer a timeout parameter?
>>
>> Thanks in advance.
>>
>>
>
>
> -- 
> --Guido van Rossum (python.org/~guido) 
>

Reply via email to