On 28 March 2018 at 18:50, Wenbo Zhu <wen...@google.com> wrote: > >>> Any useful RS support will have to involve some application-level >>> protocols. >>> >> >> I completely disagree with this statement. As someone who has been >> involved in/led the implementation of multiple WebSocket clients and >> servers (including Play Framework, Lagom Framework and Akka HTTP), I can >> tell you from my experience that after the initial WebSocket handshake, >> WebSockets can be offered to application developers as just streams of >> messages using Reactive Streams, without any additional WebSocket protocol >> exposure, and still be useful for 99% of use cases. It certainly does not >> have to involve exposing the protocol to the application, most applications >> care nothing about message fragmentation, they care nothing about close >> codes or reasons beyond whether the connection was terminated with a >> success or error, and they care nothing about pings/pongs. Everything that >> they do care about, Reactive Streams gives a way of expressing. >> > > I don't see the role of RS if the WS wire protocol doesn't support any > signaling for the receiver to notify the sender to "slow down", except to > respect the basic TCP flow-control. > > I am reasonably familiar with the IETF WS spec ( > https://tools.ietf.org/html/rfc6455) ... and it would be useful if you > could point out how exactly flow-control is supposed to work with RS on top > of Websockets. >
I'm not sure how familiar you are with Reactive Streams, so let me first start by explaining its backpressure mechanism. RS backpressure works by a Subscriber having a Subscription object. Subscription provides a method request(n: Long). A Publisher may only invoke a Subscribers onNext method as many times as the number that has been supplied to request. So, if request(8) has been invoked, then the Publisher is allowed to invoke onNext 8 times with 8 elements. The outstanding demand is cumulative, so if request(8) is invoked, then after that request(4) has been invoked, then altogether, the publisher is allowed to invoke onNext 12 times. So backpressure is implemented in RS by virtue of a Subscriber letting the outstanding demand reach 0, and not invoking request again until it's ready to receive more elements. When a Publisher reaches this state, where outstanding demand is zero, it will stop consuming data from its source of data. In the case of WebSockets, this will be a TCP socket. Assuming an NIO based implementation, this will in practice mean that it will cancel the selection key for that channels read interest. Now at this point it's worth pointing out, there will be buffers involved along the way, there may be partially decoded WebSocket frames sitting in a buffer, there may be more fully decoded WebSocket frames ready to be emitted but can't because demand is zero. So, there is buffering done here, deregistering interest in reading doesn't result in immediate backpressure propagation. But eventually, the OS receive buffer will fill up, and the OS will then send an ack with a receive window of zero. This will prevent the remote system from sending any more data. Now, how that then impacts the WebSocket endpoint on the remote system depends on that endpoints implementation. If it's using blocking IO, then, once the remote OS's send buffer is full, any calls to send new messages will block. Thus, back pressure will have been propagated to the remote WebSocket endpoint. If it's using non blocking IO with a backpressure mechanism, for example, if it's using Reactive Streams, then the subscriber is going to stop requesting demand. Demand will reach zero, causing the publisher to be unable to send more data. The publisher will then translate this to whatever backpressure mechanism its source of data represents, perhaps its receiving messages from a message queue, it's going to stop consuming those messages. Thus, backpressure will have been propagated. Another case is that it might be using non blocking IO with no backpressure mechanism - this is unfortunately the case for many WebSocket implementations, including web browsers, and the Java EE WebSocket API. In that case, the WebSocket implementation there has two options, buffer, drop, or fail, or a combination of two of those. If it buffers, it will eventually run out of memory. Probably the most common implementation is to buffer up to a point, and then fail. In such a case - back pressure has been propagated, though not gracefully, the result is a failure. But the important thing to realise here is that the side that is receiving messages has been able to use backpressure to protect itself from running out of memory - without a backpressure mechanism, there would have been no tcp backpressure, and it would have had to have accepted data at the rate produced by the remote end, potentially buffering and running out of memory, or having to fail. So, as you can see, WebSockets don't need an explicit backpressure mechanism in the protocol, they can piggy back off TCP. And Reactive Streams can use this to ensure a consumer of a WebSocket stream on one end can push back on the producer at the other end, and have that push back propagated all the way through, albeit with a certain amount of buffering delaying the push back from happening immediately. > Thanks. > > >> > with the understanding that such a helper will limit the capabilities >>>> and performance when its used - eg, the range of options for closing, range >>>> of options for how errors are handled, and perhaps introduce some small >>>> performance penalties such as additional allocations and/or buffer copies. >>>> >>>> That is a whole set of comprises and decisions that would need to >>>> be considered. >>>> >>>> > The use case here is how well this API will interact with other APIs. >>>> WebSockets are rarely used in isolation from other technologies. Most non >>>> trivial usages of WebSockets will involve plumbing the WebSocket API to >>>> some other source or sink of streaming data - for example, a message >>>> broker, a database, perhaps a gRPC stream to another service. >>>> >>>> I could well image a set of vertical-market specific libraries being >>>> built >>>> on top of the WebSocket API, and that is how it should be. In this >>>> specific area, providing the lowest level API is an enabler for others. >>>> >>>> > The question is, how difficult will it be to connect those two APIs? >>>> >>>> I imagine some work may be required, but certainly much less than >>>> without the WebSocket API. >>>> >>>> > Will their backpressure mechanisms map to each other easily? >>>> >>>> I see no reason that it should not. Are you aware of any? >>>> >>>> > Will their failure handling and error handling semantics map to each >>>> other easily? >>>> >>>> I see no reason that they should not. In fact, most higher-level APIs >>>> provide more coarse grain error handling. >>>> >>>> > How many lines of boilerplate will it take to hook them up? >>>> >>>> I guess that depends on the particular library's functionality, will it >>>> support/handle/expose partial message delivery, buffering, etc. I >>>> don’t see this as boilerplate though. >>>> >>>> > If the WebSocket provides an option to use Reactive Streams, then for >>>> any other sources/sink of streamed data that support Reactive Streams, >>>> application developers will have an option where the answers are guaranteed >>>> to be yes without the developer having to do any mapping themselves, and >>>> the lines of code to do that will be one. >>>> >>>> Sure, if one has a higher-level library that works with Reactive >>>> Streams, >>>> and one wants to use WebSocket as a transport, then it would be little >>>> work if Java SE provided a Reactive Streams interface to WebSocket. >>>> There are also vertical-markets using WebSocket, but not using Reactive >>>> Streams. >>>> >>>> At this point in JEP 321, I do not want to increase the scope of the >>>> project >>>> to include a Reactive Streams interface for WebSocket. This is something >>>> that can, and should, be considered separate to JEP 321. It could follow >>>> in a future release if deemed appropriate, or not, but adding it at >>>> this point >>>> will add risk to JEP 321, which I am not willing to do. >>>> >>>> -Chris. >>>> >>>> >>> >> >> >> -- >> *James Roper* >> *Senior Octonaut* >> >> Lightbend <https://www.lightbend.com/> – Build reactive apps! >> Twitter: @jroper <https://twitter.com/jroper> >> > > -- *James Roper* *Senior Octonaut* Lightbend <https://www.lightbend.com/> – Build reactive apps! Twitter: @jroper <https://twitter.com/jroper>