On Wed, 15 Feb 2017 08:35:09 -0800, jn...@jnthn.net wrote: > Hope this helps, and I'll keep the issue under consideration.
So the time came to tackle getting supply/react/whenever syntax capable of playing nice with non-blocking await, and I decided as part of those changes to look at both this problem and also the more general problem of lack of good back-pressure and, related to that, lack of fairness when using the supply/react syntax. To recap, until very recently, a `supply` or `react` block instance had its own processing queue. If it was empty, the emitting thread would enter and run the required code. If any messages were emitted to it in the meantime, they would be queued asynchronously. When the sender of the currently-being-processed message was done, it would check if there was anything added to the queue in the meantime, and if so it would process those too. This mechanism also handled recursive messages by queuing them up (this occurs when some code running in a `supply` block instance results in another emit being sent to the same `supply` block instance). The asynchronous queuing, however, meant that the cost of processing a message didn't push back on senders as it should have. I've just finished (I hope :-)) re-working the Supply internals to instead use asynchronous locking throughout. An asynchronous lock returns a Promise that will already be Kept if the lock is already available, or will be Kept once it is available. Multiple contenders queue for the lock. This, in combination with non-blocking `await` of the Promise, forms the new supply concurrency control model, used consistently in `supply` blocks and elsewhere in the supplies implementation (previously, the code elsewhere used a real mutex, which gave its own set of issues). On its own this cannot replace the previous mechanism, however, because the queuing was used in preventing various forms of deadlock, especially recursion. It also would cause problems for any `whenever` tapping a `supply` that emitted values synchronously after being tapped (as in your case). The former is resolved by detecting lock recursion and, in that case falling back to queuing the work to run later, using the thread pool. The latter is resolved with a custom Awaiter implementation: if anything during the processing of setting up a `whenever` block does an `await`, a continuation is taken, and then - after the setup work of the `supply` block is completed - the continuations are invoked. This latter case is relevant to the original subject to this ticket, because with the supply concurrency control mechanism now being asynchronous locking, the outcome of the `emit` that previously queued endlessly is now an `await` instead. Thus the setup of the consumer is allowed to complete, before the producer is resumed. Any further awaits are also collected and handled in the same way, until we run out of them. The effect is that if we rewrite the original code (to use the CLOSE phaser, not a hack with a role): sub make-supply() { supply { until my $done { say "Emitting ..."; emit(++$); } CLOSE $done = True; } } my $s2 = make-supply; react { whenever $s2 -> $n { say "Received $n"; done if $n >= 5; } } Then it will produce: Emitting ... Received 1 Emitting ... Received 2 Emitting ... Received 3 Emitting ... Received 4 Emitting ... Received 5 Furthermore, if it is written as just: sub make-supply() { supply { loop { say "Emitting ..."; emit(++$); } } } my $s2 = make-supply; react { whenever $s2 -> $n { say "Received $n"; done if $n >= 5; } } The output is similar: Emitting ... Received 1 Emitting ... Received 2 Emitting ... Received 3 Emitting ... Received 4 Emitting ... Received 5 Emitting ... Note the one extra "Emitting ...". The `emit` operation will now check if the supply block is still active; in this case, it was closed by its consumer, so it won't bother emitting and won't bother resuming either (emit is a control exception, which is why we can unwind the stack and thus exit the loop). Finally, this model means that: sub make-supply() { supply { my $i = 0; loop { say "Emitting 2000 messages" if $i %% 2000; emit(++$i); } } } my $s2 = make-supply; react { my $received = 0; whenever $s2 -> $n { $received++; } whenever Promise.in(1) { say "Received $received messages"; done; } } Works - as in, the second `whenever` block gets its fair chance to have a message processed too, so the output is something like: Emitting 2000 messages Emitting 2000 messages Emitting 2000 messages Emitting 2000 messages Emitting 2000 messages Emitting 2000 messages Emitting 2000 messages Emitting 2000 messages Emitting 2000 messages Received 17291 messages I've added some tests to S17-supply/syntax.t. Thanks for the original ticket; hopefully this solution will make things better for those writing "source" supplies.