On 28 March 2018 at 18:50, Wenbo Zhu wrote:
>
>>> Any useful RS support will have to involve some application-level
>>> protocols.
>>>
>>
>> I completely disagree with this statement. As someone who has been
>> involved in/led the implementation of multiple WebSocket clients and
>> servers (including Play Framework, Lagom Framework and Akka HTTP), I can
>> tell you from my experience that after the initial WebSocket handshake,
>> WebSockets can be offered to application developers as just streams of
>> messages using Reactive Streams, without any additional WebSocket protocol
>> exposure, and still be useful for 99% of use cases. It certainly does not
>> have to involve exposing the protocol to the application, most applications
>> care nothing about message fragmentation, they care nothing about close
>> codes or reasons beyond whether the connection was terminated with a
>> success or error, and they care nothing about pings/pongs. Everything that
>> they do care about, Reactive Streams gives a way of expressing.
>>
>
> I don't see the role of RS if the WS wire protocol doesn't support any
> signaling for the receiver to notify the sender to "slow down", except to
> respect the basic TCP flow-control.
>
> I am reasonably familiar with the IETF WS spec (
> https://tools.ietf.org/html/rfc6455) ... and it would be useful if you
> could point out how exactly flow-control is supposed to work with RS on top
> of Websockets.
>
I'm not sure how familiar you are with Reactive Streams, so let me first
start by explaining its backpressure mechanism.
RS backpressure works by a Subscriber having a Subscription object.
Subscription provides a method request(n: Long). A Publisher may only
invoke a Subscribers onNext method as many times as the number that has
been supplied to request. So, if request(8) has been invoked, then the
Publisher is allowed to invoke onNext 8 times with 8 elements. The
outstanding demand is cumulative, so if request(8) is invoked, then after
that request(4) has been invoked, then altogether, the publisher is allowed
to invoke onNext 12 times. So backpressure is implemented in RS by virtue
of a Subscriber letting the outstanding demand reach 0, and not invoking
request again until it's ready to receive more elements.
When a Publisher reaches this state, where outstanding demand is zero, it
will stop consuming data from its source of data. In the case of
WebSockets, this will be a TCP socket. Assuming an NIO based
implementation, this will in practice mean that it will cancel the
selection key for that channels read interest. Now at this point it's worth
pointing out, there will be buffers involved along the way, there may be
partially decoded WebSocket frames sitting in a buffer, there may be more
fully decoded WebSocket frames ready to be emitted but can't because demand
is zero. So, there is buffering done here, deregistering interest in
reading doesn't result in immediate backpressure propagation. But
eventually, the OS receive buffer will fill up, and the OS will then send
an ack with a receive window of zero. This will prevent the remote system
from sending any more data.
Now, how that then impacts the WebSocket endpoint on the remote system
depends on that endpoints implementation. If it's using blocking IO, then,
once the remote OS's send buffer is full, any calls to send new messages
will block. Thus, back pressure will have been propagated to the remote
WebSocket endpoint. If it's using non blocking IO with a backpressure
mechanism, for example, if it's using Reactive Streams, then the subscriber
is going to stop requesting demand. Demand will reach zero, causing the
publisher to be unable to send more data. The publisher will then translate
this to whatever backpressure mechanism its source of data represents,
perhaps its receiving messages from a message queue, it's going to stop
consuming those messages. Thus, backpressure will have been propagated.
Another case is that it might be using non blocking IO with no backpressure
mechanism - this is unfortunately the case for many WebSocket
implementations, including web browsers, and the Java EE WebSocket API. In
that case, the WebSocket implementation there has two options, buffer,
drop, or fail, or a combination of two of those. If it buffers, it will
eventually run out of memory. Probably the most common implementation is to
buffer up to a point, and then fail. In such a case - back pressure has
been propagated, though not gracefully, the result is a failure. But the
important thing to realise here is that the side that is receiving messages
has been able to use backpressure to protect itself from running out of
memory - without a backpressure mechanism, there would have been no tcp
backpressure, and it would have had to have accepted data at the rate
produced by the remote end, potentially buffering and running out of
memory, or having to fail.
So, as you can see, WebSockets don't need an explicit