TL;DR: How do I use timeouts with "Queue.get()"? I am currently porting my web crawler from Tornado to Asyncio using Trollius.
I seem to have noticed that using ".wait()" or ".wait_for()" on "Queue.get()" will sometimes lose items. I assumed that since "Queue.get()" does not have a timeout parameter, "wait_for()" should be used. I first noticed that my unit tests would occasionally hang. In this situation, I am using a queue to send RPC messages to PhantomJS via stdin and stdout. I have a coroutine that is run as ".async()" which has a while loop that consumes the queue and sends it off to PhantomJS. Since PhantomJS blocks on "readline()", I decided to timeout every 0.1 seconds. Occasionally, a unit test would fail but the logs for PhantomJS say it's reading things every 0.1 seconds, but it never received a specific RPC message. I switched to "Queue.get_nowait()" and ".sleep()" (so no more ".wait_for()") and that seems to solved the problem. You can see the changes at: https://github.com/chfoo/wpull/commit/5d52a144ec6a00e66b9409ec1524d4f87eeda4bb Now I ran into another situation where I am losing items. I have another producer-consumer situation where the number of consumers may change and consumers are implicitly producing items. In this case, I decided to implement two queues: one for items (JoinableQueue) and one for poison pills (Queue). Each consumer will ".wait()" with "FIRST_COMPLETED" on both queues using "Queue.get()". If the consumer gets one item, either process the item or die. The other task is cancelled. If two items, do the item first and then die. In both cases after processing the item, "JoinableQueue.task_done()" is called. However, I seem to be losing items since the producer occasionally calls "JoinableQueue.join()" when it shouldn't. When there are no more items from the source, it checks if any consumers are alive so it can restart producing just in case the consumers implicitly produce more items. But this hangs forever as an item was lost and the consumers never seen the item. So the number of items not done is still above 0. You can see the code at https://github.com/chfoo/wpull/blob/d255199506d69816fcd39f3eb950bdc106e510ae/wpull/engine.py#L100 . Am I misunderstanding the point of ".wait()"/".wait_for()"? What does a task have to do when it gets cancelled? Should I assume that all tasks are scheduled and running during ".wait()"? Am I supposed to assume that any cancelled task is a task that has completed but we are discarding its results? Why doesn't "Queue.get()" offer a timeout parameter? Thanks in advance.
