On Wed, 15 Feb 2017 08:35:09 -0800, jn...@jnthn.net wrote:
> Hope this helps, and I'll keep the issue under consideration.

So the time came to tackle getting supply/react/whenever syntax capable of 
playing nice with non-blocking await, and I decided as part of those changes to 
look at both this problem and also the more general problem of lack of good 
back-pressure and, related to that, lack of fairness when using the 
supply/react syntax.

To recap, until very recently, a `supply` or `react` block instance had its own 
processing queue. If it was empty, the emitting thread would enter and run the 
required code. If any messages were emitted to it in the meantime, they would 
be queued asynchronously. When the sender of the currently-being-processed 
message was done, it would check if there was anything added to the queue in 
the meantime, and if so it would process those too. This mechanism also handled 
recursive messages by queuing them up (this occurs when some code running in a 
`supply` block instance results in another emit being sent to the same `supply` 
block instance). The asynchronous queuing, however, meant that the cost of 
processing a message didn't push back on senders as it should have.

I've just finished (I hope :-)) re-working the Supply internals to instead use 
asynchronous locking throughout. An asynchronous lock returns a Promise that 
will already be Kept if the lock is already available, or will be Kept once it 
is available. Multiple contenders queue for the lock. This, in combination with 
non-blocking `await` of the Promise, forms the new supply concurrency control 
model, used consistently in `supply` blocks and elsewhere in the supplies 
implementation (previously, the code elsewhere used a real mutex, which gave 
its own set of issues).

On its own this cannot replace the previous mechanism, however, because the 
queuing was used in preventing various forms of deadlock, especially recursion. 
It also would cause problems for any `whenever` tapping a `supply` that emitted 
values synchronously after being tapped (as in your case). The former is 
resolved by detecting lock recursion and, in that case falling back to queuing 
the work to run later, using the thread pool. The latter is resolved with a 
custom Awaiter implementation: if anything during the processing of setting up 
a `whenever` block does an `await`, a continuation is taken, and then - after 
the setup work of the `supply` block is completed - the continuations are 
invoked.

This latter case is relevant to the original subject to this ticket, because 
with the supply concurrency control mechanism now being asynchronous locking, 
the outcome of the `emit` that previously queued endlessly is now an `await` 
instead. Thus the setup of the consumer is allowed to complete, before the 
producer is resumed. Any further awaits are also collected and handled in the 
same way, until we run out of them. The effect is that if we rewrite the 
original code (to use the CLOSE phaser, not a hack with a role):

sub make-supply() {
    supply { 
        until my $done {
            say "Emitting ...";
            emit(++$);
        }   
        CLOSE $done = True;
    } 
}  

my $s2 = make-supply;
react {
    whenever $s2 -> $n {
        say "Received $n";
        done if $n >= 5;
    }   
}

Then it will produce:

Emitting ...
Received 1
Emitting ...
Received 2
Emitting ...
Received 3
Emitting ...
Received 4
Emitting ...
Received 5

Furthermore, if it is written as just:

sub make-supply() {
    supply { 
        loop {
            say "Emitting ...";
            emit(++$);
        }   
    }
}   

my $s2 = make-supply;
react {
    whenever $s2 -> $n {
        say "Received $n";
        done if $n >= 5;
    }   
}   

The output is similar:

Emitting ...
Received 1
Emitting ...
Received 2
Emitting ...
Received 3
Emitting ...
Received 4
Emitting ...
Received 5
Emitting ...

Note the one extra "Emitting ...". The `emit` operation will now check if the 
supply block is still active; in this case, it was closed by its consumer, so 
it won't bother emitting and won't bother resuming either (emit is a control 
exception, which is why we can unwind the stack and thus exit the loop).

Finally, this model means that:

sub make-supply() {
    supply { 
        my $i = 0;
        loop {
            say "Emitting 2000 messages" if $i %% 2000;
            emit(++$i);
        }   
    }   
}   

my $s2 = make-supply;
react {
    my $received = 0;
    whenever $s2 -> $n {
        $received++;
    }   
    whenever Promise.in(1) {
        say "Received $received messages";
        done;
    }
}   

Works - as in, the second `whenever` block gets its fair chance to have a 
message processed too, so the output is something like:

Emitting 2000 messages
Emitting 2000 messages
Emitting 2000 messages
Emitting 2000 messages
Emitting 2000 messages
Emitting 2000 messages
Emitting 2000 messages
Emitting 2000 messages
Emitting 2000 messages
Received 17291 messages

I've added some tests to S17-supply/syntax.t. Thanks for the original ticket; 
hopefully this solution will make things better for those writing "source" 
supplies.

Reply via email to