Note: I haven't looked at the Tulip code yet (and won't have time until
next week -- I'm on a much-needed vacation).

It looks possible that when you call .get() on a Queue wrapped in a timeout
(e.g. using wait_for()) you could see a timeout while the Future
representing the result is still going to get its result. This is because
wait_for() doesn't have very strong guarantees. In particular cancelling a
Task takes effect after some delay because the generator wrapped by the
Task must be given a chance to catch the cancellation with a try/except or
try/finally.

Unfortunately it's hard to explain exactly how tasks are scheduled in fewer
words than the implementation -- the only easy property that as long as you
don't explicitly play with threads, task switches only happen at "yield
from" points -- but that doesn't help much because most operations are
implemented using a multitude of tasks. Also it looks like you're using
Trollius, and I don't actually understand the details of how things work
there.

All in all it is still quite possible that there's a bug in your code, but
it feels likely that this is a race in our Queue API and we will have to
add a timeout to the Queue.get() and Queue.put() APIs to fix it. A third
possible outcome is that there may be a way to write your code that avoids
the race, and we may decide that we should just document that, because I
really don't want to have to add timeout parameters to every method. But if
Queues really are special enough we'll do it!

Anyway, it would be well worth your while if you could reproduce this in a
simple test case (not using the rest of your own code) and file a bug (I
prefer bugs in the tulip project, but asyncio bugs filed in the Python
tracker will also reach me).




On Tue, Aug 19, 2014 at 6:09 PM, Christopher Foo <[email protected]>
wrote:

> TL;DR: How do I use timeouts with "Queue.get()"?
>
> I am currently porting my web crawler from Tornado to Asyncio using
> Trollius.
>
> I seem to have noticed that using ".wait()" or ".wait_for()" on
> "Queue.get()" will sometimes lose items. I assumed that since "Queue.get()"
> does not have a timeout parameter, "wait_for()" should be used.
>
> I first noticed that my unit tests would occasionally hang. In this
> situation, I am using a queue to send RPC messages to PhantomJS via stdin
> and stdout. I have a coroutine that is run as ".async()" which has a while
> loop that consumes the queue and sends it off to PhantomJS. Since PhantomJS
> blocks on "readline()", I decided to timeout every 0.1 seconds.
>
> Occasionally, a unit test would fail but the logs for PhantomJS say it's
> reading things every 0.1 seconds, but it never received a specific RPC
> message. I switched to "Queue.get_nowait()" and ".sleep()" (so no more
> ".wait_for()") and that seems to solved the problem. You can see the
> changes at:
> https://github.com/chfoo/wpull/commit/5d52a144ec6a00e66b9409ec1524d4f87eeda4bb
>
> Now I ran into another situation where I am losing items. I have another
> producer-consumer situation where the number of consumers may change and
> consumers are implicitly producing items. In this case, I decided to
> implement two queues: one for items (JoinableQueue) and one for poison
> pills (Queue). Each consumer will ".wait()" with "FIRST_COMPLETED" on both
> queues using "Queue.get()". If the consumer gets one item, either process
> the item or die. The other task is cancelled. If two items, do the item
> first and then die. In both cases after processing the item,
> "JoinableQueue.task_done()" is called.
>
> However, I seem to be losing items since the producer occasionally calls
> "JoinableQueue.join()" when it shouldn't. When there are no more items from
> the source, it checks if any consumers are alive so it can restart
> producing just in case the consumers implicitly produce more items. But
> this hangs forever as an item was lost and the consumers never seen the
> item. So the number of items not done is still above 0. You can see the
> code at
> https://github.com/chfoo/wpull/blob/d255199506d69816fcd39f3eb950bdc106e510ae/wpull/engine.py#L100
> .
>
> Am I misunderstanding the point of ".wait()"/".wait_for()"? What does a
> task have to do when it gets cancelled? Should I assume that all tasks are
> scheduled and running during ".wait()"? Am I supposed to assume that any
> cancelled task is a task that has completed but we are discarding its
> results? Why doesn't "Queue.get()" offer a timeout parameter?
>
> Thanks in advance.
>
>


-- 
--Guido van Rossum (python.org/~guido)

Reply via email to