Re: Add convenience collect methods to the Stream interface

2018-12-09 Thread James Roper
Just wanted to put my hand up to say that I have had the same experience,
more than 50% of the time that I've used JDK8 streams I've used
.collect(Collectors.toList()).

But, as a counter point, let me speculate that many or most uses of streams
fall into two broad use cases. One is for actual stream processing, where
you aren't necessarily dealing with a stream that is materialized in
memory, where you may want the power of parallel processing, where you
certainly do want multiple stages of the stream processing chained together
rather than having each stage materialize a new collection in memory, and
this is what the stream API was created for. In this case, collecting to a
list is common but not that common, perhaps not common enough to warrant a
convenience method on Stream.

The other broad use case category is to work with Lists and Sets in a
functional programming style. In this case, you simply want to map, filter,
flatMap etc every element in your list/set, and this is where collecting to
a list and set is almost always the terminal operation for a stream.
Although you can use the stream API to do this, I speculate that it wasn't
primarily designed for this, and I think if the JDK wanted to provide first
class support for doing this, there would be better ways to go about it -
for example, by placing map/filter/flatMap methods directly on the
Collection interfaces and returning the collections directly, rather than
having to convert to a stream and back. Going through a stream still has
its advantages (eg, it uses less memory since you don't need to materialize
a new collection between each stage of the processing), but lacks the
convenience that other collection APIs that offer true functional
programming for Java have (eg, vavr, formerly javaslang).

And, if the JDK was to start adding features for functional programming
like that to its collection API, then it may be worth considering other
features, such as persistent collections, that is, collections where
operations such as add/remove aren't destructive, ie, immutable collections
that support efficient (constant time) append/prepend operations that
return a copy of the list. My point is that if the JDK does start to
consider adding functional programming to its collection APIs, a
holistic approach should be taken to work out what things would look like
long term, even if not all of those features are added immediately (or
ever).

On Mon, 10 Dec 2018 at 10:04, Rob Griffin (rgriffin) 
wrote:

> Hi,
>
> I have raised an enhancement request (Incident Report 913453) about adding
> some convenience methods to the Stream interface that collect the stream
> and Pallavi Sonal asked me to start a thread here about that.
>
> More than 50% of our Stream collect calls use Collectors.toList() or
> Collectors.toSet() as arguments so I think it would be very handy if the
> Stream interface had default collectToList and collectToList and
> collectToMap methods.
>
> The advantages are:
> it would be easier to use code completion in IDEs. There are lot
> of classes starting with Collect so finding the Collectors class is a bit
> of a pain.
> one less method call in what is usually a long chain of calls.
>
> Regards,
>
> Rob Griffin
> Software Analyst, Spotlight on SQL Server
> Quest | R
> rob.grif...@quest.com
>
>

-- 
James Roper
Architect, Office of the CTO, Lightbend, Inc.
@jroper <https://twitter.com/jroper>

<https://www.lightbend.com/>


Re: Stream Method Proposal: long count(Predicate predicate)

2018-11-11 Thread James Roper
Another reason to prefer a smaller API is that it can aid the ability to
comprehend, since there's less concepts to understand. The idea behind
design patterns is that APIs constrain themselves to just using the well
established and understood patterns, which means when someone who has never
seen the API before comes and sees code that uses it, they can understand
the code. But that only works if APIs do constrain themselves to the design
patterns - constraint is key to the advantageous application of design
patterns to APIs. java.util.stream does not sit by itself, it is one of
hundreds of APIs on the JVM alone, including other languages it's among
thousands of APIs that offer a functional API with filter and count
abstractions. All of these APIs share filter/count as well established,
well understood, instantly readable and understandable concepts. You don't
have to know anything about java.util.stream to be able to understand
exactly what filter(predicate).count() is doing. And this bootstrapping off
this large ecosystem of APIs is one of the things that make
java.util.stream a good API. But if it were to add count/findFirst variants
that add predicates? It's not unprecedented, but it is by far less common,
which makes it less understandable. That doesn't mean we never add
convenience methods, but they do have to add a high degree of convenience
to diverge from the well established patterns, and in this case, the
convenience added is only small.

On Fri, 9 Nov 2018 at 04:39, Roger Riggs  wrote:

> Hi Jacob,
>
> Its hard to resist the urge to add convenience methods, they look nice
> and help a few developers.
> However, they accumulate rapidly and end up obscuring the core
> functionality.
> They can hurt comprehension since they fold different functions together
> and the collective API surface area ends up impinging on every
> developers learning curve.
>
> $.02, Roger
>
>
> On 11/07/2018 08:00 PM, Jacob Glickman wrote:
> >   Hello!
> >
> > I see myself having to often call count() as a terminal operation on a
> > Stream immediately after performing a filter operation. How feasible
> would
> > it be to add an overloaded count() method that accepts a Predicate, which
> > it uses as a filter before returning the count of elements in the Stream?
> > If this is supported, I'd gladly create the webrev & tests for it!
> >
> > I suppose the method signature can be something along the lines of:
> >
> >  long count(Predicate predicate)
> >
> > It would also seem reasonable to give this method to IntStream,
> > DoubleStream, and LongStream, but allowing them to use IntPredicate,
> > DoublePredicate, and LongPredicate respectively.
> >
> > Thanks,
> >
> > Jacob Glickman
>
>

-- 
James Roper
Architect, Office of the CTO, Lightbend, Inc.
@jroper <https://twitter.com/jroper>

<https://www.lightbend.com/>


Why Stream.concat is a static method - type variable contravariance

2018-10-10 Thread James Roper
With the work I'm doing at the moment at creating a Reactive Streams
equivalent to java.util.stream, I've often wondered why Stream.concat is a
static method, rather than an instance method concating the given stream
onto this. But I think the reason has just dawned on me, and I wanted to
confirm that I'm correct.

Java doesn't support contravariant type variables - it does for type
declarations, but not type variables.

To put more concretely, if I had a Stream, and I wanted to concat
a Stream, this is a valid thing to do, the resulting stream would
be Stream. But doing that with an instance method would require
something like this:

public  Stream concat(Stream b);

Which is not supported (specifically,  type variable declaration
is not supported). In contrast, what we have in the actual API:

public static  Stream concat(Stream a, Stream b);

does allow me to concat a Stream and Stream with a
resulting type of Stream.

Is this right, or are there other reasons? Also, is there any possibility
that Java might support contravariance in type variables in future? My
reason for wanting it is to provide the following method for reactive
streams:

public  Publisher onErrorResumeWith(Function> f);

The intent of this method is when a stream encounters an error, the passed
function is invoked with the error, and that function returns a publisher
that gets concated to the current stream instead of the error being
emitted. This could possibly be implemented with a static method:

public static  Publisher onErrorResumeWith(Publisher a,
Function f);

But unlike concat, this method feels and reads much better as an instance
method, as a static method it's a little confusing.

Regards,

James

-- 
*James Roper*
*Senior Developer, Office of the CTO*

Lightbend <https://www.lightbend.com/> – Build reactive apps!
Twitter: @jroper <https://twitter.com/jroper>


Re: Reactive Streams utility API

2018-09-25 Thread James Roper
Sounds good to me, I'll ping Viktor to make sure he sees it too.

On Wed, 26 Sep 2018 at 06:34, Pavel Rappo  wrote:

>
> > On 25 Sep 2018, at 05:33, James Roper  wrote:
> >
> > Hi Pavel,
> >
> > 
> >
> > As for the MutexExecutor itself, that was mostly written by Viktor
> Klang, and I believe he wrote it based on his experience implementing
> similar constructs for Akka mailboxes. There is one major problem with it
> that I'm aware of - it's not fair on the underlying executor. If you submit
> tasks at an equal or higher rate than can be processed by a single thread,
> the executor will never return the thread it uses to the underlying
> executor. I don't think that's hard to fix - we could limit the number of
> sequential tasks it does on a thread before resubmitting to the underlying
> executor.
> >
> > A slightly different incarnation of this problem is when each task
> invoked resubmits another task, for example, using the current reactive
> streams API, if I did ReactiveStreams.generate(() ->
> "foo").forEach(System.out::println), that is by design effectively an
> infinite loop that prints out foo, but being an asynchronous API it
> shouldn't actually be an infinite loop, it should return the thread back to
> the underlying executor at least periodically to allow that thread to be
> used for other tasks queued on the executor, but it doesn't do that.
> >
> > But this issue (and some similar issues that may exist) we haven't begun
> to consider addressing, primarily because fixing it requires selecting some
> magic numbers for limits on work to do, and they can't be selected without
> some realistic benchmarks being created to tune them to, and we're just not
> ready to take this implementation to that level, it could change very
> significantly which would change all the assumptions before it's ready to
> be used.
> >
>
> James, thanks for such a detailed explanation! Right now I'm mostly
> interested
> in mechanics of this executor. I think this discussion deserves a separate
> thread on concurrency-interest mailing list. What do you think? Once I've
> started the thread there, I will get get back here with the link to it
> (for the interested parties' convenience).
>
> -Pavel
>
>

-- 
*James Roper*
*Senior Developer, Office of the CTO*

Lightbend <https://www.lightbend.com/> – Build reactive apps!
Twitter: @jroper <https://twitter.com/jroper>


Re: Reactive Streams utility API

2018-09-24 Thread James Roper
Hi Pavel,

MicroProfile doesn't do implementations, or even have a concept of a
reference implementation, which is why that executor isn't there. It is
however in one of the implementations of the spec that we've created at
Lightbend:

https://github.com/lightbend/microprofile-reactive-streams/blob/master/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/MutexExecutor.java

This implementation is being called a "zero dependency" implementation,
since it doesn't bring in any other dependencies, unlike the implementation
that we are going to use ourselves, which uses Akka Streams. The zerodep
implementation isn't production ready, our main reason for providing it is
as a candidate starting point for a reference implementation in the JDK.
But I also don't think it's that bad an implementation, it's just that
unlike the Akka Streams based implementation (or any of the other
implementations, eg RedHat's one built on RxJava and Vert.x), it has not
been proven through any real world use.

As for the MutexExecutor itself, that was mostly written by Viktor Klang,
and I believe he wrote it based on his experience implementing similar
constructs for Akka mailboxes. There is one major problem with it that I'm
aware of - it's not fair on the underlying executor. If you submit tasks at
an equal or higher rate than can be processed by a single thread, the
executor will never return the thread it uses to the underlying executor. I
don't think that's hard to fix - we could limit the number of sequential
tasks it does on a thread before resubmitting to the underlying executor.

A slightly different incarnation of this problem is when each task invoked
resubmits another task, for example, using the current reactive streams
API, if I did ReactiveStreams.generate(() ->
"foo").forEach(System.out::println), that is by design effectively an
infinite loop that prints out foo, but being an asynchronous API it
shouldn't actually be an infinite loop, it should return the thread back to
the underlying executor at least periodically to allow that thread to be
used for other tasks queued on the executor, but it doesn't do that.

But this issue (and some similar issues that may exist) we haven't begun to
consider addressing, primarily because fixing it requires selecting some
magic numbers for limits on work to do, and they can't be selected without
some realistic benchmarks being created to tune them to, and we're just not
ready to take this implementation to that level, it could change very
significantly which would change all the assumptions before it's ready to
be used.

Cheers,

James

On Fri, 21 Sep 2018 at 21:16, Pavel Rappo  wrote:

> Hi James,
>
> It sounds like the project is being very active. Glad to hear that! No
> matter
> how good a mailing list is, in practice, it cannot be the best mailing
> list for
> every topic there is. core-libs-dev is no exception. It's good to see
> you've
> found another way to make progress on the project.
>
> The question I was going to ask you was on the implementation internals
>
> org.reactivestreams.utils.impl.MutexExecutor
>
> However, after examining the new repository, I found that not only has the
> code
> in question disappeared, but the structure of the project has changed as
> well.
>
> Will the API have the default implementation (ReactiveStreamsEngine)?
>
> -Pavel
>
>

-- 
*James Roper*
*Senior Developer, Office of the CTO*

Lightbend <https://www.lightbend.com/> – Build reactive apps!
Twitter: @jroper <https://twitter.com/jroper>


Re: Reactive Streams utility API

2018-09-18 Thread James Roper
Hi Pavel,

Thanks for asking about this. I was meant to send an update to the list
earlier but never got around to it.

As you probably noticed from the history of this thread, there was a
deafening silence in response to it. We decided that core-libs-dev might
not have been the best place to start this effort. So we've taken it to
Eclipse MicroProfile, and the intention is to incubate it there, and then
once we have something that we're reasonably happy with, look at
contributing that to the JDK. That strategy has so far gone really well.
The project lives here:

https://github.com/eclipse/microprofile-reactive-streams

We have a specification:

http://download.eclipse.org/microprofile/microprofile-reactive-1.0-M1/microprofile-reactive-streams-operators-spec.html

A TCK (see the TCK section of the spec for info).

Comprehensive API docs:

http://download.eclipse.org/microprofile/microprofile-reactive-1.0-M1/apidocs/

And all this has been produced by a team of around 10 contributors from
five different companies (Lightbend, RedHat, IBM, Payara and Tomitribe),
with all meetings recorded and minuted:

https://docs.google.com/document/d/1UKBBNFn-CeMih3lNUGvpShcL2aTpnvmJs9LQ8qT9OI0/edit#

We are about to release a 1.0-RC1, and are planning to have a 1.0 out in
time for Oracle Code One, where there will be some presentations on it.

My plan was, once 1.0 was released, to resume discussions on this list
about whether this could be contributed to the JDK, possibly as an
incubating module.

Are there any questions in particular that you have about it? If you're
interested, we have an open meeting every Tuesday, 1100 UTC, that you're
welcome to join (actually next week we're expecting some other Oracle devs
to join from the team that produced Helidon). A link to the calendar event
for this meeting, with links to the Zoom meeting and minutes, can be found
here:

https://calendar.google.com/calendar/event?eid=NDk3ZjQxMjkyZGh1N2lrZnU2dTk0Y29laWtfMjAxODA5MjVUMTEwMDAwWiBnYm5iYzM3M2dhNDBuMHR2Ymw4OG5rYzNyNEBn=Etc/GMT

Cheers,

James

On Tue, 28 Aug 2018 at 00:33, Pavel Rappo  wrote:

> Hi James,
>
> What's the status of the project? Has it been abandoned?
>
> -Pavel
>
> > On 22 Mar 2018, at 00:09, James Roper  wrote:
> >
> > Hi all,
> >
> > We've created a reference implementation. It's been done in such a way
> that
> > implementation of new features (stages) is quite straight forward, there
> is
> > an abstraction that does the hard work of handling concurrency, the
> > reactive streams spec conformance, and managing demand and buffering, and
> > error handling, and then individual stages (eg, map, filter, flatMap) are
> > implemented using a very easy to use API (note, this API/abstraction is
> all
> > private, internal to the reference implementation). Rudimentary tests on
> > performance show that it's not terrible compared to other reactive
> streams
> > implementations, with a number of clear optimization paths identified
> > should we decide that's necessary. I believe this proposal is now close
> to
> > complete - the remaining work is:
> >
> > * Decide what scope beyond JDK8 streams it should support - while this
> > decision is not trivial, the amount of work required to actually add
> these
> > to the API and implement in the reference implementation is trivial.
> > * Fill out the TCK with more rigorous verification.
> > * Create some rigorous benchmarks.
> >
> > I'm not sure what should be done next. I have talked to a number of
> people
> > who are either involved in, or are writing APIs that use Reactive Streams
> > in private, and interest seems high. Also, there is general consensus in
> > public discussions in the Jakarta EE/MicroProfile communities that an API
> > like this would be very valuable in the JDK. The API of course could be
> > done in Jakarta EE instead, but given that Reactive Streams is part of
> and
> > used by the JDK, and given that the JDK8 Streams API is part of the JDK,
> > Jakarta EE would seem an odd place to put this library - it essentially
> > would mean that to make effective use of JDK libraries that use Reactive
> > Streams (eg HTTP client, possibly java.sql2 aka ADBA), you need to use
> > Jakarta EE (or some third party streaming library).
> >
> > So unless there's any major feedback coming here on this list, I'd like
> to
> > put this forward as a JEP.
> >
> > Regards,
> >
> > James
> >
> > On 15 March 2018 at 12:24, James Roper  wrote:
> >
> >> Hi all,
> >>
> >> An update on this. We've now filled out the API with feature parity with
> >> the JDK8 Streams API - for operators that make sense in Reactive
> Streams.
> >> We'

Should Java have a unit type?

2018-05-09 Thread James Roper
Hi all,

I'm interested in opinions on whether Java should have a unit type. I
apologise if this has already been discussed on this list before, I did
search for it but didn't find anything.

By unit type, I'm referring to a type that represents no value.  When a
Java method returns no value, void is used, however, void is not a type.

java.lang.Void exists as a placeholder class for use in reflection, but it
doesn't seem that it's meant to be used directly as a type.

In spite of this, java.lang.Void is used in some places in the JDK as a
type, for example, it's used by CompletionStage in methods such as
thenAccept to represent an operation that can complete or fail in future
but has no value:

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html#thenAccept-java.util.function.Consumer-

Working with it in this way is a little clunky and counter intuitive, Void
is uninstantiable, so how can a CompletionStage ever be redeemed? The
answer is, it gets redeemed with null. Are null and void the same thing?
Apparently, according to this API, but they aren't really, null represents
the absence of a value that could exist, and is treated as such in APIs
like java.util.Optional, which is a different thing from no value.

Now perhaps the use of Void in this way by CompletionStage means the horse
has already bolted, and that now Void is a unit type with a singleton
instance represented by null, by default.

But in my opinion, it really isn't ideal. One issue is that it means any
API that uses Void types won't play well with other APIs that don't accept
nulls - a consequence of void and null being different things. A big one in
this case is Reactive Streams, onNext does not accept null, so if you
asynchronously map a reactive stream, and the asynchronous operation you do
returns CompletionStage (this is actually a common thing to do, eg,
if you're doing a sequence of operations, and the last one is a commit, it
is becoming common practice for asynchronous commit methods to return
CompletionStage), then you need to thenApply that CompletionStage to
some other non null unit type before you can emit it to the stream.

So it's my opinion that Java should have a unit type. This could be done by
promoting Void to a unit type, not just a placeholder type, and giving it a
singleton instance. That approach would mean existing uses of Void as a
unit type would be source compatible, and can migrate to using the instance
instead of null.

Another approach would be to introduce a new type, perhaps java.lang.Unit.
Both Scala and Kotlin have their own unit types, scala.Unit and kotlin.Unit
respectively. Akka did something a little more interesting, it has two unit
types, used to indicate two different intents, the first is NotUsed, which
is used to indicate that a type parameter has no significant meaning, and
the second is Done, which is used to indicate that a type parameter of this
type indicates that whatever it represents has been completed successfully
- this type is typically used in combination with CompletionStage, while
NotUsed might be used in place of Void in PrivelegedAction, as described
here:

https://docs.oracle.com/javase/8/docs/technotes/guides/security/doprivileged.html

Regards,

James

-- 
*James Roper*
*Senior Octonaut*

Lightbend <https://www.lightbend.com/> – Build reactive apps!
Twitter: @jroper <https://twitter.com/jroper>


Multi-Release jars intended use cases

2018-03-25 Thread James Roper
Hi all,

According to Alan Bateman, MR jars can be used to introduce JDK9 API
features on classes, such that javac can compile code against them to use
the JDK9 specific API provided by the JDK9 version of the class, and still
have the same classes be compatible with (and loadable by) JDK8:

http://kto.so/2017/09/30/the-practical-truth-about-multi-release-jars/#comment-3563924109

My question is whether this is a recommended practice or not. Let me put
forward a practical example of a place where we might want to use this.

The 4 Reactive Streams interfaces (Publisher, Subscriber, Processor and
Subscription) were first available in the org.reactivestreams project,
under the package name org.reactivestreams. They were then added to JDK9,
as inner interfaces of java.util.concurrent.Flow. Would it be considered a
good use of MR jars to make the org.reactivestreams interfaces extend their
juc.Flow counterparts when running and compiling by JDK9+? So, for example,
in JDK8 and earlier you get:

package org.reactivestreams;

public interface Publisher {
  void subscribe(Subscriber s);
}

While on JDK9, using the MR feature, you get this:

package org.reactivestreams;

import java.util.concurrent.Flow;

public interface Publisher extends Flow.Publisher {
  void subscribe(Subscriber s);
  default void subscribe(Flow.Subscriber s) {
if (s instanceof Subscriber) {
  this.subscribe((Subscriber) s);
} else {
  this.subscribe(new FlowSubscriberAdapter<>(s));
}
  }
}

The advantage of this approach is that it means if you have a library that
produces org.reactivestreams.Publisher, then without any changes to that
library to support JDK9, you can pass it to another library (eg, the JDK9
HTTP client) that accepts Flow.Publisher. Of course, it's not a panacea to
solving the migration problem of org.reactivestreams to juc.Flow, but it
goes a long way to helping.

But of course, while the JDK *does* support this, that doesn't necessarily
mean that we *should* use it that way, especially if it wasn't intended to
be used in that way. Tooling, such as IDEs and compilers for other JVM
languages, probably won't support this out of the box and may still need to
catch up, and if it was never intended to be used in that way in the first
place, then the tooling may decide not to implement it to work in that way.

So it would be nice to have an official statement about whether the above
use case is using the MR feature as intended, and whether that use case is
considered a good practice or not.

Regards,

James

-- 
*James Roper*
*Senior Octonaut*

Lightbend <https://www.lightbend.com/> – Build reactive apps!
Twitter: @jroper <https://twitter.com/jroper>


Re: Reactive Streams utility API

2018-03-21 Thread James Roper
Hi all,

We've created a reference implementation. It's been done in such a way that
implementation of new features (stages) is quite straight forward, there is
an abstraction that does the hard work of handling concurrency, the
reactive streams spec conformance, and managing demand and buffering, and
error handling, and then individual stages (eg, map, filter, flatMap) are
implemented using a very easy to use API (note, this API/abstraction is all
private, internal to the reference implementation). Rudimentary tests on
performance show that it's not terrible compared to other reactive streams
implementations, with a number of clear optimization paths identified
should we decide that's necessary. I believe this proposal is now close to
complete - the remaining work is:

* Decide what scope beyond JDK8 streams it should support - while this
decision is not trivial, the amount of work required to actually add these
to the API and implement in the reference implementation is trivial.
* Fill out the TCK with more rigorous verification.
* Create some rigorous benchmarks.

I'm not sure what should be done next. I have talked to a number of people
who are either involved in, or are writing APIs that use Reactive Streams
in private, and interest seems high. Also, there is general consensus in
public discussions in the Jakarta EE/MicroProfile communities that an API
like this would be very valuable in the JDK. The API of course could be
done in Jakarta EE instead, but given that Reactive Streams is part of and
used by the JDK, and given that the JDK8 Streams API is part of the JDK,
Jakarta EE would seem an odd place to put this library - it essentially
would mean that to make effective use of JDK libraries that use Reactive
Streams (eg HTTP client, possibly java.sql2 aka ADBA), you need to use
Jakarta EE (or some third party streaming library).

So unless there's any major feedback coming here on this list, I'd like to
put this forward as a JEP.

Regards,

James

On 15 March 2018 at 12:24, James Roper <ja...@lightbend.com> wrote:

> Hi all,
>
> An update on this. We've now filled out the API with feature parity with
> the JDK8 Streams API - for operators that make sense in Reactive Streams.
> We've provided example implementations of the API backed by both Akka
> Streams and rxjava, showing that it can be widely implemented. The TCK
> still needs some work, but covers the major features, and comprehensively
> covers all publishers/subscribers with the Reactive Streams TCK (so
> comprehensive that we actually found two Reactive Streams TCK violations in
> Akka Streams with this, and a couple in rxjava too).
>
> There are two major areas of work left to get something that would be
> ready to be a candidate for a final API. The first is to produce a zero
> dependency reference implementation of the API. This is what I plan on
> starting on next.
>
> The second is to decide what additional operators and generators the API
> should provide. So far, the scope has been mostly limited to a subset of
> the JDK8 Streams scope, only a few additional API pieces have been created,
> such as a few variations on flatMap (one that supports CompletionStage, and
> one that supports Iterable). There are a number of other features for
> consideration to provide basic necessary functionality for this API, here's
> some examples off the top of my head (some of these can already be
> implemented in terms of other stages already provided):
>
> * A cancelled subscriber (useful for cleaning up hot publishers)
> * An ignoring subscriber (useful when you do the actual work in a previous
> stage of the graph, such as mapping to completion stages)
> * Error handling subscribers and/or processors
> * Termination listening subscribers and/or processors
> * A processor that wraps a subscriber and publisher
> * The ability to merge streams - so far only concat is provided, and all
> flatMaps are essentially concatenations, merge variants may be useful
> (though introduce a lot of complexity, such as specifying breadth)
> * The ability to split streams into sub streams - a use case for this is
> in parsing a stream that contains potentially large sub streams, like
> parsing a multipart/form-data body
> * Batching of elements in a stream, based on predicate, influenced by
> backpressure or based on a scheduled tick
> * Scheduled features such as emitting ticks, rate limiting, etc
> * The ability to control buffering and asynchronous boundaries within a
> graph
> * Naming of stages for debug/error reporting/monitoring purposes
>
> Not all of the above may be absolutely necessary, but should be
> considered, and there may be other features as well that would be useful to
> consider.
>
> Please visit the repo and any feedback would be much appreciated:
>
> https://github.com/lightbend/rea

Re: Reactive Streams utility API

2018-03-14 Thread James Roper
Hi all,

An update on this. We've now filled out the API with feature parity with
the JDK8 Streams API - for operators that make sense in Reactive Streams.
We've provided example implementations of the API backed by both Akka
Streams and rxjava, showing that it can be widely implemented. The TCK
still needs some work, but covers the major features, and comprehensively
covers all publishers/subscribers with the Reactive Streams TCK (so
comprehensive that we actually found two Reactive Streams TCK violations in
Akka Streams with this, and a couple in rxjava too).

There are two major areas of work left to get something that would be ready
to be a candidate for a final API. The first is to produce a zero
dependency reference implementation of the API. This is what I plan on
starting on next.

The second is to decide what additional operators and generators the API
should provide. So far, the scope has been mostly limited to a subset of
the JDK8 Streams scope, only a few additional API pieces have been created,
such as a few variations on flatMap (one that supports CompletionStage, and
one that supports Iterable). There are a number of other features for
consideration to provide basic necessary functionality for this API, here's
some examples off the top of my head (some of these can already be
implemented in terms of other stages already provided):

* A cancelled subscriber (useful for cleaning up hot publishers)
* An ignoring subscriber (useful when you do the actual work in a previous
stage of the graph, such as mapping to completion stages)
* Error handling subscribers and/or processors
* Termination listening subscribers and/or processors
* A processor that wraps a subscriber and publisher
* The ability to merge streams - so far only concat is provided, and all
flatMaps are essentially concatenations, merge variants may be useful
(though introduce a lot of complexity, such as specifying breadth)
* The ability to split streams into sub streams - a use case for this is in
parsing a stream that contains potentially large sub streams, like parsing
a multipart/form-data body
* Batching of elements in a stream, based on predicate, influenced by
backpressure or based on a scheduled tick
* Scheduled features such as emitting ticks, rate limiting, etc
* The ability to control buffering and asynchronous boundaries within a
graph
* Naming of stages for debug/error reporting/monitoring purposes

Not all of the above may be absolutely necessary, but should be considered,
and there may be other features as well that would be useful to consider.

Please visit the repo and any feedback would be much appreciated:

https://github.com/lightbend/reactive-streams-utils

Regards,

James

On 8 March 2018 at 03:59, Brian Goetz <brian.go...@oracle.com> wrote:

> To answer the questions at the bottom: the next step is to start working
> on this and get folks excited about contributing.  There's plenty of time
> for process later, but filing a JEP or creating a project shouldn't be a
> barrier to innovating.
>
>
> On 2/28/2018 10:33 PM, James Roper wrote:
>
>> Hi all,
>>
>> We've put together a simple proposal for this. Please read the README for
>> an introduction to this proposal.
>>
>> https://github.com/lightbend/reactive-streams-utils
>>
>> Regards,
>>
>> James
>>
>> On 22 February 2018 at 11:47, James Roper <ja...@lightbend.com> wrote:
>>
>> Hi all,
>>>
>>> This is an email to give people a heads up that we'd like to look at
>>> creating an API, in the same vein as the JDK8 Streams API, for building
>>> reactive streams (a la JDK9 juc.Flow). Our goals for this are:
>>>
>>> * To fill a gap in the JDK where if a developer wants to do even the
>>> simplest of things with a JDK9 juc.Flow, such as map or filter, they need
>>> to bring in a third party library that implements that.
>>> * To produce an API that can build Publishers, Subscribers, Processors,
>>> and complete graphs, for the purposes of consuming APIs that use reactive
>>> streams (for example, JDK9 Http Client).
>>> * To produce an API that aligns closely with ju.stream.Stream, using it
>>> for inspiration for naming, scope, general API shape, and other aspects.
>>> The purpose of this goal is to ensure familiarity of Java developers with
>>> the new API, and to limit the number of concepts Java developers need to
>>> understand to do the different types of streaming offered by the JDK.
>>> * To produce an API that can be implemented by multiple providers
>>> (including an RI in the JDK itself), using the ServiceLoader mechanism to
>>> provide and load a default implementation (while allowing custom
>>> implementations to be manually provided). There are a lot of

Re: Reactive Streams utility API

2018-02-28 Thread James Roper
Hi all,

We've put together a simple proposal for this. Please read the README for
an introduction to this proposal.

https://github.com/lightbend/reactive-streams-utils

Regards,

James

On 22 February 2018 at 11:47, James Roper <ja...@lightbend.com> wrote:

> Hi all,
>
> This is an email to give people a heads up that we'd like to look at
> creating an API, in the same vein as the JDK8 Streams API, for building
> reactive streams (a la JDK9 juc.Flow). Our goals for this are:
>
> * To fill a gap in the JDK where if a developer wants to do even the
> simplest of things with a JDK9 juc.Flow, such as map or filter, they need
> to bring in a third party library that implements that.
> * To produce an API that can build Publishers, Subscribers, Processors,
> and complete graphs, for the purposes of consuming APIs that use reactive
> streams (for example, JDK9 Http Client).
> * To produce an API that aligns closely with ju.stream.Stream, using it
> for inspiration for naming, scope, general API shape, and other aspects.
> The purpose of this goal is to ensure familiarity of Java developers with
> the new API, and to limit the number of concepts Java developers need to
> understand to do the different types of streaming offered by the JDK.
> * To produce an API that can be implemented by multiple providers
> (including an RI in the JDK itself), using the ServiceLoader mechanism to
> provide and load a default implementation (while allowing custom
> implementations to be manually provided). There are a lot of concerns that
> each different streams implementation provides and implements, beyond
> streaming, for example monitoring/tracing, concurrency modelling, buffering
> strategies, performance aspects of the streams handling including fusing,
> and context (eg thread local) propagation. This will allow libraries to use
> and provide contracts based on this API without depending on a particular
> implementation, and allows developers to select the implementation that
> meets their needs.
>
> Non goals:
>
> * To produce a kitchen sink of utilities for working with reactive
> streams. There already exist a number of reactive streams implementations
> that seek to meet this goal (eg, Akka Streams, Reactor, RxJava), and once
> you go past the basics (map, filter, collect), and start dealing with
> things like fan in/out, cycles, restarting, etc, the different approaches
> to solving this start to vary greatly. The JDK should provide enough to be
> useful for typical every day streaming use cases, with developers being
> able to select a third party library for anything more advanced.
>
> We will update this list when we have something ready for public review.
> This probably won't be far off. Our hope is that we can propose this as a
> JEP.
>
> Regards,
>
> James
>
> --
> *James Roper*
> *Senior Octonaut*
>
> Lightbend <https://www.lightbend.com/> – Build reactive apps!
> Twitter: @jroper <https://twitter.com/jroper>
>



-- 
*James Roper*
*Senior Octonaut*

Lightbend <https://www.lightbend.com/> – Build reactive apps!
Twitter: @jroper <https://twitter.com/jroper>


Reactive Streams utility API

2018-02-21 Thread James Roper
Hi all,

This is an email to give people a heads up that we'd like to look at
creating an API, in the same vein as the JDK8 Streams API, for building
reactive streams (a la JDK9 juc.Flow). Our goals for this are:

* To fill a gap in the JDK where if a developer wants to do even the
simplest of things with a JDK9 juc.Flow, such as map or filter, they need
to bring in a third party library that implements that.
* To produce an API that can build Publishers, Subscribers, Processors, and
complete graphs, for the purposes of consuming APIs that use reactive
streams (for example, JDK9 Http Client).
* To produce an API that aligns closely with ju.stream.Stream, using it for
inspiration for naming, scope, general API shape, and other aspects. The
purpose of this goal is to ensure familiarity of Java developers with the
new API, and to limit the number of concepts Java developers need to
understand to do the different types of streaming offered by the JDK.
* To produce an API that can be implemented by multiple providers
(including an RI in the JDK itself), using the ServiceLoader mechanism to
provide and load a default implementation (while allowing custom
implementations to be manually provided). There are a lot of concerns that
each different streams implementation provides and implements, beyond
streaming, for example monitoring/tracing, concurrency modelling, buffering
strategies, performance aspects of the streams handling including fusing,
and context (eg thread local) propagation. This will allow libraries to use
and provide contracts based on this API without depending on a particular
implementation, and allows developers to select the implementation that
meets their needs.

Non goals:

* To produce a kitchen sink of utilities for working with reactive streams.
There already exist a number of reactive streams implementations that seek
to meet this goal (eg, Akka Streams, Reactor, RxJava), and once you go past
the basics (map, filter, collect), and start dealing with things like fan
in/out, cycles, restarting, etc, the different approaches to solving this
start to vary greatly. The JDK should provide enough to be useful for
typical every day streaming use cases, with developers being able to select
a third party library for anything more advanced.

We will update this list when we have something ready for public review.
This probably won't be far off. Our hope is that we can propose this as a
JEP.

Regards,

James

-- 
*James Roper*
*Senior Octonaut*

Lightbend <https://www.lightbend.com/> – Build reactive apps!
Twitter: @jroper <https://twitter.com/jroper>


Re: [concurrency-interest] We need to add blocking methods to CompletionStage!

2016-09-24 Thread James Roper
On 23 September 2016 at 07:27, Martin Buchholz <marti...@google.com> wrote:

> Thanks for the lesson, James!
>
> I took a look at the Scala/Akka/LightBend world.  Even there, blocking
> always remains a possibility, even if discouraged, e.g.
> scala.concurrent.Future extends Awaitable (!), and
> http://doc.akka.io/docs/akka/snapshot/general/actor-
> systems.html#Blocking_Needs_Careful_Management.
> And any general purpose Java API needs to be useful outside of a framework.
>

Yes absolutely - personally I'm not against putting join on CompletionStage
(I'm happy to leave the disagreement of this to Viktor), my main concern
regarding CS/CF is encapsulation - if a library performs an asynchronous
computation, that library should be able to safely return a future of that
(using the term future here in the general sense) to clients and remain in
control of it, without worrying about what the clients will do with it -
the future should be effectively "immutable" from the clients perspective,
that is to say, the client shouldn't be able to change it's behaviour at
all.  For example, we often cache futures and return them from a libraries
API, if a client could cancel a future, that would break everything else
that received that future.  We'd rather not have to be in a situation where
to build a resilient API, we have to "copy" every future before we return
it, like APIs currently have to do with the Java collections API.


>   But of course, the exception is junit tests, in that case, we encourage
>> the use of CompletionStage.toCompletableFuture to block.
>>
>
> We're currently fixing jdk9 
> CompletableFuture.minimalCompletionStage().toCompletableFuture()
> to be awaitable.
>
> To make toCompletableFuture().join() more reliable as a recommended way to
> block, I think we should remove the encouragement to throw UOE from:
> http://download.java.net/java/jdk9/docs/api/java/util/
> concurrent/CompletionStage.html#toCompletableFuture--
>
> It sounds like the Akka/LightBend World is happy with current
> CompletionStage API (may I call this the "actor purist API"?), while other
> users will want a bigger, more general CompletableFuture subset for
> consuming future values.  Frustratingly, some of them will want cancel(),
> some not; and we have no good names for any new interfaces; right now all I
> can think of is CompletionStage2 and CompletionStage3 !)
>
> The current implementation of CompletableFuture has a "memory leak
> problem" in that e.g. minimalStage().toCompletableFuture().isDone() will
> leak a Completion object until the source stage itself is collected.  Which
> doesn't happen if e.g. a direct isDone() is provided.
> JDK-8161600: Garbage retention when source CompletableFutures are never
> completed
> (but no one has ever complained!)
>



-- 
*James Roper*
*Software Engineer*

Lightbend <https://www.lightbend.com/> – Build reactive apps!
Twitter: @jroper <https://twitter.com/jroper>


Re: [concurrency-interest] We need to add blocking methods to CompletionStage!

2016-09-24 Thread James Roper
On 22 September 2016 at 06:43, Martin Buchholz <marti...@google.com> wrote:

> What is happening instead is API providers not using CompletionStage as
> return values in public APIs because of the lack of convenient blocking,
> and instead returning CompletableFuture, which is a tragic software
> engineering failure.
>

Out of interest, which APIs are returning CompletableFuture rather than
CompletionStage?  In the Play Framework Java API, we have embraced
CompletionStage, we use it absolutely everywhere.  Likewise in Lagom
Framework and the Java API for Akka streams.

When it comes to blocking, we strongly advocate never blocking (in the
threading models of these projects, blocking on a future will make you very
prone to deadlocks).  But of course, the exception is junit tests, in that
case, we encourage the use of CompletionStage.toCompletableFuture to
block.  In these projects, you'll generally encounter two different
implementations of CompletionStage, one is CompletableFuture, the other is
a scala.concurrent.Future backed implementation.  Both implement
toCompletableFuture.  But a user that integrates a third party library with
its own CompletionStage implementation of course may encounter issues, that
said, I haven't yet seen many (or any?) libraries outside of our ecosystem
that are embracing CompletionStage or CompletableFuture in their public
APIs yet (this is probably due to my ignorance, not due to there not being
any), which is part of why I'm interested in knowing what libraries you
know of that are using it.

Re-adding join is easy.  We discourage CompletionStage.toCompletableFuture
> from throwing UnsupportedOperationException, and implement join as:
>
> public default T join() { return toCompletableFuture().join(); }
>
> There is a risk of multiple-inheritance conflict with Future if we add
> e.g. isDone(), but there are no current plans to turn those Future methods
> into default methods, and even if we did in some future release, it would
> be only a source, not binary incompatibility, so far less serious.
>
> ___
> Concurrency-interest mailing list
> concurrency-inter...@cs.oswego.edu
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>
>


-- 
*James Roper*
*Software Engineer*

Lightbend <https://www.lightbend.com/> – Build reactive apps!
Twitter: @jroper <https://twitter.com/jroper>