I took your advice and I was able to rewrite the code so I don't need to use "wait()" or canceling tasks.
Instead I am using PriorityQueue for items and poison pills, JoinableQueue for tokens that represent items in progress, and a BoundedSemaphore to bound the number of items in the queue while keeping poison pills unbounded. (One may see the changes at https://github.com/chfoo/wpull/commit/28493a2ca4ba07688a62e36f5d88855c60338f7c ) So the good news is that I don't think I will ever need to use wait()/wait_for() for queues. I think it would be good idea that the documentation should mention that wait()/wait_for() doesn't provide the usual guarantees for synchronization primitives and code doing so should be rewritten. On Tuesday, August 19, 2014 9:45:32 PM UTC-4, Guido van Rossum wrote: > > Note: I haven't looked at the Tulip code yet (and won't have time until > next week -- I'm on a much-needed vacation). > > It looks possible that when you call .get() on a Queue wrapped in a > timeout (e.g. using wait_for()) you could see a timeout while the Future > representing the result is still going to get its result. This is because > wait_for() doesn't have very strong guarantees. In particular cancelling a > Task takes effect after some delay because the generator wrapped by the > Task must be given a chance to catch the cancellation with a try/except or > try/finally. > > Unfortunately it's hard to explain exactly how tasks are scheduled in > fewer words than the implementation -- the only easy property that as long > as you don't explicitly play with threads, task switches only happen at > "yield from" points -- but that doesn't help much because most operations > are implemented using a multitude of tasks. Also it looks like you're using > Trollius, and I don't actually understand the details of how things work > there. > > All in all it is still quite possible that there's a bug in your code, but > it feels likely that this is a race in our Queue API and we will have to > add a timeout to the Queue.get() and Queue.put() APIs to fix it. A third > possible outcome is that there may be a way to write your code that avoids > the race, and we may decide that we should just document that, because I > really don't want to have to add timeout parameters to every method. But if > Queues really are special enough we'll do it! > > Anyway, it would be well worth your while if you could reproduce this in a > simple test case (not using the rest of your own code) and file a bug (I > prefer bugs in the tulip project, but asyncio bugs filed in the Python > tracker will also reach me). > > > > > On Tue, Aug 19, 2014 at 6:09 PM, Christopher Foo <[email protected] > <javascript:>> wrote: > >> TL;DR: How do I use timeouts with "Queue.get()"? >> >> I am currently porting my web crawler from Tornado to Asyncio using >> Trollius. >> >> I seem to have noticed that using ".wait()" or ".wait_for()" on >> "Queue.get()" will sometimes lose items. I assumed that since "Queue.get()" >> does not have a timeout parameter, "wait_for()" should be used. >> >> I first noticed that my unit tests would occasionally hang. In this >> situation, I am using a queue to send RPC messages to PhantomJS via stdin >> and stdout. I have a coroutine that is run as ".async()" which has a while >> loop that consumes the queue and sends it off to PhantomJS. Since PhantomJS >> blocks on "readline()", I decided to timeout every 0.1 seconds. >> >> Occasionally, a unit test would fail but the logs for PhantomJS say it's >> reading things every 0.1 seconds, but it never received a specific RPC >> message. I switched to "Queue.get_nowait()" and ".sleep()" (so no more >> ".wait_for()") and that seems to solved the problem. You can see the >> changes at: >> https://github.com/chfoo/wpull/commit/5d52a144ec6a00e66b9409ec1524d4f87eeda4bb >> >> Now I ran into another situation where I am losing items. I have another >> producer-consumer situation where the number of consumers may change and >> consumers are implicitly producing items. In this case, I decided to >> implement two queues: one for items (JoinableQueue) and one for poison >> pills (Queue). Each consumer will ".wait()" with "FIRST_COMPLETED" on both >> queues using "Queue.get()". If the consumer gets one item, either >> process the item or die. The other task is cancelled. If two items, do the >> item first and then die. In both cases after processing the item, >> "JoinableQueue.task_done()" is called. >> >> However, I seem to be losing items since the producer occasionally calls >> "JoinableQueue.join()" when it shouldn't. When there are no more items from >> the source, it checks if any consumers are alive so it can restart >> producing just in case the consumers implicitly produce more items. But >> this hangs forever as an item was lost and the consumers never seen the >> item. So the number of items not done is still above 0. You can see the >> code at >> https://github.com/chfoo/wpull/blob/d255199506d69816fcd39f3eb950bdc106e510ae/wpull/engine.py#L100 >> . >> >> Am I misunderstanding the point of ".wait()"/".wait_for()"? What does a >> task have to do when it gets cancelled? Should I assume that all tasks are >> scheduled and running during ".wait()"? Am I supposed to assume that any >> cancelled task is a task that has completed but we are discarding its >> results? Why doesn't "Queue.get()" offer a timeout parameter? >> >> Thanks in advance. >> >> > > > -- > --Guido van Rossum (python.org/~guido) >
