Hi Pavel,

Sorry for taking a while to respond, I've been travelling and wanted to
take the time to properly understand the concerns you've raised.

The reasoning you've stated for the current API design is all sound, and
obviously the choice to go with that design and not others depends on the
goals of the API. I don't think I'm in a position to be able to disagree
with the goals that you've set, I'm sure you've done a lot of research,
talked to a lot of people and got a lot of feedback. So I'm happy to accept
that the current API as it stands is a good one and should stay as is.

The question for me then is whether there is value in introducing an
additional higher level API (ie, a reactive streams based API) for the
purposes of integration with other libraries that provide RS support.
Introducing another API, regardless of what that API is, obviously has
drawbacks as it increases the surface area of the API, and thus complexity,
and also potentially introduces confusion for application developers as
they now have two different ways of achieving things, and they need to know
which to use when. So, the value of that API must be high enough to
outweigh those drawbacks. I strongly believe that a Reactive Streams based
API does add enough value to do that - and the post that I shared in my
previous email demonstrates how an ecosystem where reactive streams is
embraced can provide a lot of value to application developers. So I'm happy
to talk more here about use cases and code samples etc if you don't agree

To address the issues you raised - I think when providing reactive streams
integration, we need to accept that it is a high level API, and there can
be costs associated with it - performance costs, in some cases
functionality costs, etc. This is the cost of abstracting a problem away so
that many different libraries can reliably integrate with each other -
there are many libraries that work with reusable buffers, but there is no
standard for how the lifecycle of those buffers are managed, in some cases
they get reused immediately, in other cases, for example, in Netty, they
use reference counting, and transparently return the buffer to a pool when
the reference count reaches zero. The Reactive Streams spec specifically
decided not to deal with this side of integrating streaming libraries,
which means either that some out of band mechanism needs to be provided for
signalling the end of use of buffers (which puts constraints on interop
because not every RS sink/source that you might plumb in would support
integrating with that out of band mechanism), or that reusable buffers
cannot be used - in general, the latter would probably be preferred, as
high level concurrency integration code generally requires messages sent
between different components to be immutable anyway.

So, I would say that a reactive streams interface would copy the reusable
buffer into a read only immutable buffer, and that this choice of decreased
performance would be a trade off given to the application developer when
they choose between using the RS API or writing their own code to use the
proprietary API.

For signalling errors upstream, in all the cases that I've worked with,
we've never found a reason that sending the error downstream, and just
cancelling upstream, is a problem. At the end of the day, there is only one
thing that can fail, the TCP connection, and if it fails, both upstream and
downstream will fail - there is no such thing as a half failure in TCP. So
the error sent down, and the error sent up, will always represent the same
failure of the TCP socket, and probably will always be exactly the same
exception. So, if you send errors both up and down, then the application
has a problem - which one should it handle? Should it handle both? The end
effect of handling both, at a minimum, will usually mean every failure of a
WebSocket connection will result in the error appearing twice in an
applications logs, once for downstream, once for upstream, which is not
helpful because there was only actually one error. At worst, it means for
example if you have code to restart a WebSocket connection, without
carefully written concurrent code, you could end up double restarting the
streams. By only sending errors through the Subscriber interface, ie
downstream, you simplify the application handling of errors, because the
application only needs to handle the error in one place, rather than having
to make a choice between handling it in one place or another, and getting
that choice right. The issue you raised with this meaning that this means
there will be coupling between the subscriber and publisher sides - while
that's correct, it's not the error handling that would cause that. The
publisher and subscriber side are always intrinsically linked, because they
are bound to the same TCP connection - they live and die based on that TCP
connection. If one side fails, so does the other side. If one side starts,
so does the otherside. If one side decides to restart, the otherside must
restart too. They are already coupled, and cannot be decoupled. Introducing
coupling in the error handling simplifies the handling, because it exposes
what is already true.

The creation of additional objects for representing message types is of
course an overhead, but one that users of the high level API would be happy
to have given the advantages of not having to implement their own
integration code. The JVM is highly optimised to handle the creation and
garbage collection of short lived objects, and this allows high level APIs
like Reactive Streams to exploit this, allowing simpler programming models
without having to forgo a huge amount of performance. Of course, that
doesn't mean we needlessly create objects for no reason, but in this case I
believe the benefits of being able to seamlessly integrate with other
streaming libraries outweigh the overhead of an allocation or two per
message. Having one stream per message type would cause problems - for
example, the socket.io protocol encodes binary messages by first sending a
text message containing metadata, which is then followed by one or more
binary messages. Sending this in multiple different streams would make
correlation of those binary messages with the text header very difficult if
not impossible.



On 12 February 2018 at 16:33, Pavel Rappo <pavel.ra...@oracle.com> wrote:

> Hello James,
> Thanks for the comprehensive reply to Chuck's email. Now regarding your
> own email.
> > On 10 Feb 2018, at 07:38, James Roper <ja...@lightbend.com> wrote:
> >
> > <snip>
> >
> > But that brings me to a problem that I'd like to give as feedback to the
> implementers - this API is not Reactive Streams, and so therefore can't
> take advantage of Reactive Streams implementations, and more problematic,
> can't interop with other Reactive Streams sinks/sources. If I wanted to
> stream a WebSocket into a message broker that supports Reactive Streams, I
> can't. I would definitely hope that Reactive Streams support could be added
> to this API, at a minimum as a wrapper, so that application developers can
> easily focus on their business problems, plumbing and transforming messages
> from one place to another, rather than having to deal with implementing
> concurrent code to pass messages.
> >
> I totally understand your concern over the lack of a Reactive Streams
> interface
> to this low-level API. All I can say is that it's not that we haven't
> tried. As
> usual the devil is in the detail. There were at least a couple of major
> issues
> we couldn't find a satisfactory solution to.
> The first one is how to communicate errors back to the user's code that
> publishes messages to WebSocket. This issue has been extensively discussed
> here [1].
> The only signal the publisher can receive from a subscriber in the case of
> a
> failure is a cancellation signal. After this signal the subscription is
> considered cancelled. Now, there are different solutions to this problem,
> but
> none of the ones we looked at seemed comprehensive. For example, such
> errors
> can be propagated by the WebSocket to the user's subscriber receiving
> messages.
> In which case WebSocket input and output will become tied and the WebSocket
> client will seem to require both the publisher and the subscriber to be
> present
> at the same time.
> The second issue is how to communicate completion signals from processing
> individual messages. That might be handy in the case the implementation or
> the
> application decide to recycle buffers they use. With a naive RS interface
> to
> WebSocket all the user's publisher can receive from the WebSocket's
> subscriber
> is a request for a number of extra messages. Using only this number the
> publisher cannot deduce which of the previously published messages have
> been
> actually sent. This problem also has a number of solutions. One of which
> would
> be to use more streams. An extra publisher WebSocket would use to emit
> outcomes
> of sending messages and an extra subscriber WebSocket would use to receive
> signals from the user once a message has been received. To be honest it
> looks
> cumbersome.
> There are also a dozen of smaller issues that have to be resolved before
> creating a well-defined RS interface to WebSocket.
> > It may well require wrapping messages in a high level object - text,
> binary, ping, pong, etc, to differentiate between the message types.
> >
> We went great lengths in supporting different kinds of requirements in
> this API.
> One of the such requirements that arose from a number of discussions on
> this
> mailing list was not to force unnecessary allocations, garbage creation and
> copying. To this end we utilised completion stages for send/receive
> operations [2].
> We abandoned types for messages and used a method-naming scheme instead (I
> believe with RS the equivalent would be to provide a stream per message
> type).
> Even WebSocket.Listener was designed such that one instance could (if
> required)
> service many WebSocket instances. That's the reason each method in
> Listener has
> a WebSocket argument.
> > <snip>
> >
> > https://developer.lightbend.com/blog/2018-02-06-reactive-
> streams-ee4j/index.html
> >
> Thanks for the link.
> That said, I think once we have satisfactory answers to the questions
> above,
> we might provide an RS adapter you were talking about to this low-level
> API.
> Meanwhile it is possible to built one already, on top of the existing API
> even
> though it is not the same as RS, semantically they are very similar.
> Thanks,
> -Pavel
> ---------------------------------------------------
> [1] https://github.com/reactive-streams/reactive-streams-jvm/issues/271
> [2] At one point we were even considering using callbacks similar to
>     java.nio.channels.CompletionHandler instead of java.util.concurrent.
> CompletionStage
>     for both sending and receiving messages. All this is for the sake of
> not
>     creating extra objects when this is considered expensive. Who knows, we
>     might come back to retrofit the API and add this capability later.

*James Roper*
*Senior Octonaut*

Lightbend <https://www.lightbend.com/> – Build reactive apps!
Twitter: @jroper <https://twitter.com/jroper>

Reply via email to