TL;DR: How do I use timeouts with "Queue.get()"?

I am currently porting my web crawler from Tornado to Asyncio using 
Trollius. 

I seem to have noticed that using ".wait()" or ".wait_for()" on 
"Queue.get()" will sometimes lose items. I assumed that since "Queue.get()" 
does not have a timeout parameter, "wait_for()" should be used.

I first noticed that my unit tests would occasionally hang. In this 
situation, I am using a queue to send RPC messages to PhantomJS via stdin 
and stdout. I have a coroutine that is run as ".async()" which has a while 
loop that consumes the queue and sends it off to PhantomJS. Since PhantomJS 
blocks on "readline()", I decided to timeout every 0.1 seconds. 

Occasionally, a unit test would fail but the logs for PhantomJS say it's 
reading things every 0.1 seconds, but it never received a specific RPC 
message. I switched to "Queue.get_nowait()" and ".sleep()" (so no more 
".wait_for()") and that seems to solved the problem. You can see the 
changes at: 
https://github.com/chfoo/wpull/commit/5d52a144ec6a00e66b9409ec1524d4f87eeda4bb

Now I ran into another situation where I am losing items. I have another 
producer-consumer situation where the number of consumers may change and 
consumers are implicitly producing items. In this case, I decided to 
implement two queues: one for items (JoinableQueue) and one for poison 
pills (Queue). Each consumer will ".wait()" with "FIRST_COMPLETED" on both 
queues using "Queue.get()". If the consumer gets one item, either process 
the item or die. The other task is cancelled. If two items, do the item 
first and then die. In both cases after processing the item, 
"JoinableQueue.task_done()" is called. 

However, I seem to be losing items since the producer occasionally calls 
"JoinableQueue.join()" when it shouldn't. When there are no more items from 
the source, it checks if any consumers are alive so it can restart 
producing just in case the consumers implicitly produce more items. But 
this hangs forever as an item was lost and the consumers never seen the 
item. So the number of items not done is still above 0. You can see the 
code at 
https://github.com/chfoo/wpull/blob/d255199506d69816fcd39f3eb950bdc106e510ae/wpull/engine.py#L100
.

Am I misunderstanding the point of ".wait()"/".wait_for()"? What does a 
task have to do when it gets cancelled? Should I assume that all tasks are 
scheduled and running during ".wait()"? Am I supposed to assume that any 
cancelled task is a task that has completed but we are discarding its 
results? Why doesn't "Queue.get()" offer a timeout parameter?

Thanks in advance.

Reply via email to