Re: Add convenience collect methods to the Stream interface
Just wanted to put my hand up to say that I have had the same experience, more than 50% of the time that I've used JDK8 streams I've used .collect(Collectors.toList()). But, as a counter point, let me speculate that many or most uses of streams fall into two broad use cases. One is for actual stream processing, where you aren't necessarily dealing with a stream that is materialized in memory, where you may want the power of parallel processing, where you certainly do want multiple stages of the stream processing chained together rather than having each stage materialize a new collection in memory, and this is what the stream API was created for. In this case, collecting to a list is common but not that common, perhaps not common enough to warrant a convenience method on Stream. The other broad use case category is to work with Lists and Sets in a functional programming style. In this case, you simply want to map, filter, flatMap etc every element in your list/set, and this is where collecting to a list and set is almost always the terminal operation for a stream. Although you can use the stream API to do this, I speculate that it wasn't primarily designed for this, and I think if the JDK wanted to provide first class support for doing this, there would be better ways to go about it - for example, by placing map/filter/flatMap methods directly on the Collection interfaces and returning the collections directly, rather than having to convert to a stream and back. Going through a stream still has its advantages (eg, it uses less memory since you don't need to materialize a new collection between each stage of the processing), but lacks the convenience that other collection APIs that offer true functional programming for Java have (eg, vavr, formerly javaslang). And, if the JDK was to start adding features for functional programming like that to its collection API, then it may be worth considering other features, such as persistent collections, that is, collections where operations such as add/remove aren't destructive, ie, immutable collections that support efficient (constant time) append/prepend operations that return a copy of the list. My point is that if the JDK does start to consider adding functional programming to its collection APIs, a holistic approach should be taken to work out what things would look like long term, even if not all of those features are added immediately (or ever). On Mon, 10 Dec 2018 at 10:04, Rob Griffin (rgriffin) wrote: > Hi, > > I have raised an enhancement request (Incident Report 913453) about adding > some convenience methods to the Stream interface that collect the stream > and Pallavi Sonal asked me to start a thread here about that. > > More than 50% of our Stream collect calls use Collectors.toList() or > Collectors.toSet() as arguments so I think it would be very handy if the > Stream interface had default collectToList and collectToList and > collectToMap methods. > > The advantages are: > it would be easier to use code completion in IDEs. There are lot > of classes starting with Collect so finding the Collectors class is a bit > of a pain. > one less method call in what is usually a long chain of calls. > > Regards, > > Rob Griffin > Software Analyst, Spotlight on SQL Server > Quest | R > rob.grif...@quest.com > > -- James Roper Architect, Office of the CTO, Lightbend, Inc. @jroper <https://twitter.com/jroper> <https://www.lightbend.com/>
Re: Stream Method Proposal: long count(Predicate predicate)
Another reason to prefer a smaller API is that it can aid the ability to comprehend, since there's less concepts to understand. The idea behind design patterns is that APIs constrain themselves to just using the well established and understood patterns, which means when someone who has never seen the API before comes and sees code that uses it, they can understand the code. But that only works if APIs do constrain themselves to the design patterns - constraint is key to the advantageous application of design patterns to APIs. java.util.stream does not sit by itself, it is one of hundreds of APIs on the JVM alone, including other languages it's among thousands of APIs that offer a functional API with filter and count abstractions. All of these APIs share filter/count as well established, well understood, instantly readable and understandable concepts. You don't have to know anything about java.util.stream to be able to understand exactly what filter(predicate).count() is doing. And this bootstrapping off this large ecosystem of APIs is one of the things that make java.util.stream a good API. But if it were to add count/findFirst variants that add predicates? It's not unprecedented, but it is by far less common, which makes it less understandable. That doesn't mean we never add convenience methods, but they do have to add a high degree of convenience to diverge from the well established patterns, and in this case, the convenience added is only small. On Fri, 9 Nov 2018 at 04:39, Roger Riggs wrote: > Hi Jacob, > > Its hard to resist the urge to add convenience methods, they look nice > and help a few developers. > However, they accumulate rapidly and end up obscuring the core > functionality. > They can hurt comprehension since they fold different functions together > and the collective API surface area ends up impinging on every > developers learning curve. > > $.02, Roger > > > On 11/07/2018 08:00 PM, Jacob Glickman wrote: > > Hello! > > > > I see myself having to often call count() as a terminal operation on a > > Stream immediately after performing a filter operation. How feasible > would > > it be to add an overloaded count() method that accepts a Predicate, which > > it uses as a filter before returning the count of elements in the Stream? > > If this is supported, I'd gladly create the webrev & tests for it! > > > > I suppose the method signature can be something along the lines of: > > > > long count(Predicate predicate) > > > > It would also seem reasonable to give this method to IntStream, > > DoubleStream, and LongStream, but allowing them to use IntPredicate, > > DoublePredicate, and LongPredicate respectively. > > > > Thanks, > > > > Jacob Glickman > > -- James Roper Architect, Office of the CTO, Lightbend, Inc. @jroper <https://twitter.com/jroper> <https://www.lightbend.com/>
Why Stream.concat is a static method - type variable contravariance
With the work I'm doing at the moment at creating a Reactive Streams equivalent to java.util.stream, I've often wondered why Stream.concat is a static method, rather than an instance method concating the given stream onto this. But I think the reason has just dawned on me, and I wanted to confirm that I'm correct. Java doesn't support contravariant type variables - it does for type declarations, but not type variables. To put more concretely, if I had a Stream, and I wanted to concat a Stream, this is a valid thing to do, the resulting stream would be Stream. But doing that with an instance method would require something like this: public Stream concat(Stream b); Which is not supported (specifically, type variable declaration is not supported). In contrast, what we have in the actual API: public static Stream concat(Stream a, Stream b); does allow me to concat a Stream and Stream with a resulting type of Stream. Is this right, or are there other reasons? Also, is there any possibility that Java might support contravariance in type variables in future? My reason for wanting it is to provide the following method for reactive streams: public Publisher onErrorResumeWith(Function> f); The intent of this method is when a stream encounters an error, the passed function is invoked with the error, and that function returns a publisher that gets concated to the current stream instead of the error being emitted. This could possibly be implemented with a static method: public static Publisher onErrorResumeWith(Publisher a, Function f); But unlike concat, this method feels and reads much better as an instance method, as a static method it's a little confusing. Regards, James -- *James Roper* *Senior Developer, Office of the CTO* Lightbend <https://www.lightbend.com/> – Build reactive apps! Twitter: @jroper <https://twitter.com/jroper>
Re: Reactive Streams utility API
Sounds good to me, I'll ping Viktor to make sure he sees it too. On Wed, 26 Sep 2018 at 06:34, Pavel Rappo wrote: > > > On 25 Sep 2018, at 05:33, James Roper wrote: > > > > Hi Pavel, > > > > > > > > As for the MutexExecutor itself, that was mostly written by Viktor > Klang, and I believe he wrote it based on his experience implementing > similar constructs for Akka mailboxes. There is one major problem with it > that I'm aware of - it's not fair on the underlying executor. If you submit > tasks at an equal or higher rate than can be processed by a single thread, > the executor will never return the thread it uses to the underlying > executor. I don't think that's hard to fix - we could limit the number of > sequential tasks it does on a thread before resubmitting to the underlying > executor. > > > > A slightly different incarnation of this problem is when each task > invoked resubmits another task, for example, using the current reactive > streams API, if I did ReactiveStreams.generate(() -> > "foo").forEach(System.out::println), that is by design effectively an > infinite loop that prints out foo, but being an asynchronous API it > shouldn't actually be an infinite loop, it should return the thread back to > the underlying executor at least periodically to allow that thread to be > used for other tasks queued on the executor, but it doesn't do that. > > > > But this issue (and some similar issues that may exist) we haven't begun > to consider addressing, primarily because fixing it requires selecting some > magic numbers for limits on work to do, and they can't be selected without > some realistic benchmarks being created to tune them to, and we're just not > ready to take this implementation to that level, it could change very > significantly which would change all the assumptions before it's ready to > be used. > > > > James, thanks for such a detailed explanation! Right now I'm mostly > interested > in mechanics of this executor. I think this discussion deserves a separate > thread on concurrency-interest mailing list. What do you think? Once I've > started the thread there, I will get get back here with the link to it > (for the interested parties' convenience). > > -Pavel > > -- *James Roper* *Senior Developer, Office of the CTO* Lightbend <https://www.lightbend.com/> – Build reactive apps! Twitter: @jroper <https://twitter.com/jroper>
Re: Reactive Streams utility API
Hi Pavel, MicroProfile doesn't do implementations, or even have a concept of a reference implementation, which is why that executor isn't there. It is however in one of the implementations of the spec that we've created at Lightbend: https://github.com/lightbend/microprofile-reactive-streams/blob/master/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/MutexExecutor.java This implementation is being called a "zero dependency" implementation, since it doesn't bring in any other dependencies, unlike the implementation that we are going to use ourselves, which uses Akka Streams. The zerodep implementation isn't production ready, our main reason for providing it is as a candidate starting point for a reference implementation in the JDK. But I also don't think it's that bad an implementation, it's just that unlike the Akka Streams based implementation (or any of the other implementations, eg RedHat's one built on RxJava and Vert.x), it has not been proven through any real world use. As for the MutexExecutor itself, that was mostly written by Viktor Klang, and I believe he wrote it based on his experience implementing similar constructs for Akka mailboxes. There is one major problem with it that I'm aware of - it's not fair on the underlying executor. If you submit tasks at an equal or higher rate than can be processed by a single thread, the executor will never return the thread it uses to the underlying executor. I don't think that's hard to fix - we could limit the number of sequential tasks it does on a thread before resubmitting to the underlying executor. A slightly different incarnation of this problem is when each task invoked resubmits another task, for example, using the current reactive streams API, if I did ReactiveStreams.generate(() -> "foo").forEach(System.out::println), that is by design effectively an infinite loop that prints out foo, but being an asynchronous API it shouldn't actually be an infinite loop, it should return the thread back to the underlying executor at least periodically to allow that thread to be used for other tasks queued on the executor, but it doesn't do that. But this issue (and some similar issues that may exist) we haven't begun to consider addressing, primarily because fixing it requires selecting some magic numbers for limits on work to do, and they can't be selected without some realistic benchmarks being created to tune them to, and we're just not ready to take this implementation to that level, it could change very significantly which would change all the assumptions before it's ready to be used. Cheers, James On Fri, 21 Sep 2018 at 21:16, Pavel Rappo wrote: > Hi James, > > It sounds like the project is being very active. Glad to hear that! No > matter > how good a mailing list is, in practice, it cannot be the best mailing > list for > every topic there is. core-libs-dev is no exception. It's good to see > you've > found another way to make progress on the project. > > The question I was going to ask you was on the implementation internals > > org.reactivestreams.utils.impl.MutexExecutor > > However, after examining the new repository, I found that not only has the > code > in question disappeared, but the structure of the project has changed as > well. > > Will the API have the default implementation (ReactiveStreamsEngine)? > > -Pavel > > -- *James Roper* *Senior Developer, Office of the CTO* Lightbend <https://www.lightbend.com/> – Build reactive apps! Twitter: @jroper <https://twitter.com/jroper>
Re: Reactive Streams utility API
Hi Pavel, Thanks for asking about this. I was meant to send an update to the list earlier but never got around to it. As you probably noticed from the history of this thread, there was a deafening silence in response to it. We decided that core-libs-dev might not have been the best place to start this effort. So we've taken it to Eclipse MicroProfile, and the intention is to incubate it there, and then once we have something that we're reasonably happy with, look at contributing that to the JDK. That strategy has so far gone really well. The project lives here: https://github.com/eclipse/microprofile-reactive-streams We have a specification: http://download.eclipse.org/microprofile/microprofile-reactive-1.0-M1/microprofile-reactive-streams-operators-spec.html A TCK (see the TCK section of the spec for info). Comprehensive API docs: http://download.eclipse.org/microprofile/microprofile-reactive-1.0-M1/apidocs/ And all this has been produced by a team of around 10 contributors from five different companies (Lightbend, RedHat, IBM, Payara and Tomitribe), with all meetings recorded and minuted: https://docs.google.com/document/d/1UKBBNFn-CeMih3lNUGvpShcL2aTpnvmJs9LQ8qT9OI0/edit# We are about to release a 1.0-RC1, and are planning to have a 1.0 out in time for Oracle Code One, where there will be some presentations on it. My plan was, once 1.0 was released, to resume discussions on this list about whether this could be contributed to the JDK, possibly as an incubating module. Are there any questions in particular that you have about it? If you're interested, we have an open meeting every Tuesday, 1100 UTC, that you're welcome to join (actually next week we're expecting some other Oracle devs to join from the team that produced Helidon). A link to the calendar event for this meeting, with links to the Zoom meeting and minutes, can be found here: https://calendar.google.com/calendar/event?eid=NDk3ZjQxMjkyZGh1N2lrZnU2dTk0Y29laWtfMjAxODA5MjVUMTEwMDAwWiBnYm5iYzM3M2dhNDBuMHR2Ymw4OG5rYzNyNEBn=Etc/GMT Cheers, James On Tue, 28 Aug 2018 at 00:33, Pavel Rappo wrote: > Hi James, > > What's the status of the project? Has it been abandoned? > > -Pavel > > > On 22 Mar 2018, at 00:09, James Roper wrote: > > > > Hi all, > > > > We've created a reference implementation. It's been done in such a way > that > > implementation of new features (stages) is quite straight forward, there > is > > an abstraction that does the hard work of handling concurrency, the > > reactive streams spec conformance, and managing demand and buffering, and > > error handling, and then individual stages (eg, map, filter, flatMap) are > > implemented using a very easy to use API (note, this API/abstraction is > all > > private, internal to the reference implementation). Rudimentary tests on > > performance show that it's not terrible compared to other reactive > streams > > implementations, with a number of clear optimization paths identified > > should we decide that's necessary. I believe this proposal is now close > to > > complete - the remaining work is: > > > > * Decide what scope beyond JDK8 streams it should support - while this > > decision is not trivial, the amount of work required to actually add > these > > to the API and implement in the reference implementation is trivial. > > * Fill out the TCK with more rigorous verification. > > * Create some rigorous benchmarks. > > > > I'm not sure what should be done next. I have talked to a number of > people > > who are either involved in, or are writing APIs that use Reactive Streams > > in private, and interest seems high. Also, there is general consensus in > > public discussions in the Jakarta EE/MicroProfile communities that an API > > like this would be very valuable in the JDK. The API of course could be > > done in Jakarta EE instead, but given that Reactive Streams is part of > and > > used by the JDK, and given that the JDK8 Streams API is part of the JDK, > > Jakarta EE would seem an odd place to put this library - it essentially > > would mean that to make effective use of JDK libraries that use Reactive > > Streams (eg HTTP client, possibly java.sql2 aka ADBA), you need to use > > Jakarta EE (or some third party streaming library). > > > > So unless there's any major feedback coming here on this list, I'd like > to > > put this forward as a JEP. > > > > Regards, > > > > James > > > > On 15 March 2018 at 12:24, James Roper wrote: > > > >> Hi all, > >> > >> An update on this. We've now filled out the API with feature parity with > >> the JDK8 Streams API - for operators that make sense in Reactive > Streams. > >> We'
Should Java have a unit type?
Hi all, I'm interested in opinions on whether Java should have a unit type. I apologise if this has already been discussed on this list before, I did search for it but didn't find anything. By unit type, I'm referring to a type that represents no value. When a Java method returns no value, void is used, however, void is not a type. java.lang.Void exists as a placeholder class for use in reflection, but it doesn't seem that it's meant to be used directly as a type. In spite of this, java.lang.Void is used in some places in the JDK as a type, for example, it's used by CompletionStage in methods such as thenAccept to represent an operation that can complete or fail in future but has no value: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html#thenAccept-java.util.function.Consumer- Working with it in this way is a little clunky and counter intuitive, Void is uninstantiable, so how can a CompletionStage ever be redeemed? The answer is, it gets redeemed with null. Are null and void the same thing? Apparently, according to this API, but they aren't really, null represents the absence of a value that could exist, and is treated as such in APIs like java.util.Optional, which is a different thing from no value. Now perhaps the use of Void in this way by CompletionStage means the horse has already bolted, and that now Void is a unit type with a singleton instance represented by null, by default. But in my opinion, it really isn't ideal. One issue is that it means any API that uses Void types won't play well with other APIs that don't accept nulls - a consequence of void and null being different things. A big one in this case is Reactive Streams, onNext does not accept null, so if you asynchronously map a reactive stream, and the asynchronous operation you do returns CompletionStage (this is actually a common thing to do, eg, if you're doing a sequence of operations, and the last one is a commit, it is becoming common practice for asynchronous commit methods to return CompletionStage), then you need to thenApply that CompletionStage to some other non null unit type before you can emit it to the stream. So it's my opinion that Java should have a unit type. This could be done by promoting Void to a unit type, not just a placeholder type, and giving it a singleton instance. That approach would mean existing uses of Void as a unit type would be source compatible, and can migrate to using the instance instead of null. Another approach would be to introduce a new type, perhaps java.lang.Unit. Both Scala and Kotlin have their own unit types, scala.Unit and kotlin.Unit respectively. Akka did something a little more interesting, it has two unit types, used to indicate two different intents, the first is NotUsed, which is used to indicate that a type parameter has no significant meaning, and the second is Done, which is used to indicate that a type parameter of this type indicates that whatever it represents has been completed successfully - this type is typically used in combination with CompletionStage, while NotUsed might be used in place of Void in PrivelegedAction, as described here: https://docs.oracle.com/javase/8/docs/technotes/guides/security/doprivileged.html Regards, James -- *James Roper* *Senior Octonaut* Lightbend <https://www.lightbend.com/> – Build reactive apps! Twitter: @jroper <https://twitter.com/jroper>
Multi-Release jars intended use cases
Hi all, According to Alan Bateman, MR jars can be used to introduce JDK9 API features on classes, such that javac can compile code against them to use the JDK9 specific API provided by the JDK9 version of the class, and still have the same classes be compatible with (and loadable by) JDK8: http://kto.so/2017/09/30/the-practical-truth-about-multi-release-jars/#comment-3563924109 My question is whether this is a recommended practice or not. Let me put forward a practical example of a place where we might want to use this. The 4 Reactive Streams interfaces (Publisher, Subscriber, Processor and Subscription) were first available in the org.reactivestreams project, under the package name org.reactivestreams. They were then added to JDK9, as inner interfaces of java.util.concurrent.Flow. Would it be considered a good use of MR jars to make the org.reactivestreams interfaces extend their juc.Flow counterparts when running and compiling by JDK9+? So, for example, in JDK8 and earlier you get: package org.reactivestreams; public interface Publisher { void subscribe(Subscriber s); } While on JDK9, using the MR feature, you get this: package org.reactivestreams; import java.util.concurrent.Flow; public interface Publisher extends Flow.Publisher { void subscribe(Subscriber s); default void subscribe(Flow.Subscriber s) { if (s instanceof Subscriber) { this.subscribe((Subscriber) s); } else { this.subscribe(new FlowSubscriberAdapter<>(s)); } } } The advantage of this approach is that it means if you have a library that produces org.reactivestreams.Publisher, then without any changes to that library to support JDK9, you can pass it to another library (eg, the JDK9 HTTP client) that accepts Flow.Publisher. Of course, it's not a panacea to solving the migration problem of org.reactivestreams to juc.Flow, but it goes a long way to helping. But of course, while the JDK *does* support this, that doesn't necessarily mean that we *should* use it that way, especially if it wasn't intended to be used in that way. Tooling, such as IDEs and compilers for other JVM languages, probably won't support this out of the box and may still need to catch up, and if it was never intended to be used in that way in the first place, then the tooling may decide not to implement it to work in that way. So it would be nice to have an official statement about whether the above use case is using the MR feature as intended, and whether that use case is considered a good practice or not. Regards, James -- *James Roper* *Senior Octonaut* Lightbend <https://www.lightbend.com/> – Build reactive apps! Twitter: @jroper <https://twitter.com/jroper>
Re: Reactive Streams utility API
Hi all, We've created a reference implementation. It's been done in such a way that implementation of new features (stages) is quite straight forward, there is an abstraction that does the hard work of handling concurrency, the reactive streams spec conformance, and managing demand and buffering, and error handling, and then individual stages (eg, map, filter, flatMap) are implemented using a very easy to use API (note, this API/abstraction is all private, internal to the reference implementation). Rudimentary tests on performance show that it's not terrible compared to other reactive streams implementations, with a number of clear optimization paths identified should we decide that's necessary. I believe this proposal is now close to complete - the remaining work is: * Decide what scope beyond JDK8 streams it should support - while this decision is not trivial, the amount of work required to actually add these to the API and implement in the reference implementation is trivial. * Fill out the TCK with more rigorous verification. * Create some rigorous benchmarks. I'm not sure what should be done next. I have talked to a number of people who are either involved in, or are writing APIs that use Reactive Streams in private, and interest seems high. Also, there is general consensus in public discussions in the Jakarta EE/MicroProfile communities that an API like this would be very valuable in the JDK. The API of course could be done in Jakarta EE instead, but given that Reactive Streams is part of and used by the JDK, and given that the JDK8 Streams API is part of the JDK, Jakarta EE would seem an odd place to put this library - it essentially would mean that to make effective use of JDK libraries that use Reactive Streams (eg HTTP client, possibly java.sql2 aka ADBA), you need to use Jakarta EE (or some third party streaming library). So unless there's any major feedback coming here on this list, I'd like to put this forward as a JEP. Regards, James On 15 March 2018 at 12:24, James Roper <ja...@lightbend.com> wrote: > Hi all, > > An update on this. We've now filled out the API with feature parity with > the JDK8 Streams API - for operators that make sense in Reactive Streams. > We've provided example implementations of the API backed by both Akka > Streams and rxjava, showing that it can be widely implemented. The TCK > still needs some work, but covers the major features, and comprehensively > covers all publishers/subscribers with the Reactive Streams TCK (so > comprehensive that we actually found two Reactive Streams TCK violations in > Akka Streams with this, and a couple in rxjava too). > > There are two major areas of work left to get something that would be > ready to be a candidate for a final API. The first is to produce a zero > dependency reference implementation of the API. This is what I plan on > starting on next. > > The second is to decide what additional operators and generators the API > should provide. So far, the scope has been mostly limited to a subset of > the JDK8 Streams scope, only a few additional API pieces have been created, > such as a few variations on flatMap (one that supports CompletionStage, and > one that supports Iterable). There are a number of other features for > consideration to provide basic necessary functionality for this API, here's > some examples off the top of my head (some of these can already be > implemented in terms of other stages already provided): > > * A cancelled subscriber (useful for cleaning up hot publishers) > * An ignoring subscriber (useful when you do the actual work in a previous > stage of the graph, such as mapping to completion stages) > * Error handling subscribers and/or processors > * Termination listening subscribers and/or processors > * A processor that wraps a subscriber and publisher > * The ability to merge streams - so far only concat is provided, and all > flatMaps are essentially concatenations, merge variants may be useful > (though introduce a lot of complexity, such as specifying breadth) > * The ability to split streams into sub streams - a use case for this is > in parsing a stream that contains potentially large sub streams, like > parsing a multipart/form-data body > * Batching of elements in a stream, based on predicate, influenced by > backpressure or based on a scheduled tick > * Scheduled features such as emitting ticks, rate limiting, etc > * The ability to control buffering and asynchronous boundaries within a > graph > * Naming of stages for debug/error reporting/monitoring purposes > > Not all of the above may be absolutely necessary, but should be > considered, and there may be other features as well that would be useful to > consider. > > Please visit the repo and any feedback would be much appreciated: > > https://github.com/lightbend/rea
Re: Reactive Streams utility API
Hi all, An update on this. We've now filled out the API with feature parity with the JDK8 Streams API - for operators that make sense in Reactive Streams. We've provided example implementations of the API backed by both Akka Streams and rxjava, showing that it can be widely implemented. The TCK still needs some work, but covers the major features, and comprehensively covers all publishers/subscribers with the Reactive Streams TCK (so comprehensive that we actually found two Reactive Streams TCK violations in Akka Streams with this, and a couple in rxjava too). There are two major areas of work left to get something that would be ready to be a candidate for a final API. The first is to produce a zero dependency reference implementation of the API. This is what I plan on starting on next. The second is to decide what additional operators and generators the API should provide. So far, the scope has been mostly limited to a subset of the JDK8 Streams scope, only a few additional API pieces have been created, such as a few variations on flatMap (one that supports CompletionStage, and one that supports Iterable). There are a number of other features for consideration to provide basic necessary functionality for this API, here's some examples off the top of my head (some of these can already be implemented in terms of other stages already provided): * A cancelled subscriber (useful for cleaning up hot publishers) * An ignoring subscriber (useful when you do the actual work in a previous stage of the graph, such as mapping to completion stages) * Error handling subscribers and/or processors * Termination listening subscribers and/or processors * A processor that wraps a subscriber and publisher * The ability to merge streams - so far only concat is provided, and all flatMaps are essentially concatenations, merge variants may be useful (though introduce a lot of complexity, such as specifying breadth) * The ability to split streams into sub streams - a use case for this is in parsing a stream that contains potentially large sub streams, like parsing a multipart/form-data body * Batching of elements in a stream, based on predicate, influenced by backpressure or based on a scheduled tick * Scheduled features such as emitting ticks, rate limiting, etc * The ability to control buffering and asynchronous boundaries within a graph * Naming of stages for debug/error reporting/monitoring purposes Not all of the above may be absolutely necessary, but should be considered, and there may be other features as well that would be useful to consider. Please visit the repo and any feedback would be much appreciated: https://github.com/lightbend/reactive-streams-utils Regards, James On 8 March 2018 at 03:59, Brian Goetz <brian.go...@oracle.com> wrote: > To answer the questions at the bottom: the next step is to start working > on this and get folks excited about contributing. There's plenty of time > for process later, but filing a JEP or creating a project shouldn't be a > barrier to innovating. > > > On 2/28/2018 10:33 PM, James Roper wrote: > >> Hi all, >> >> We've put together a simple proposal for this. Please read the README for >> an introduction to this proposal. >> >> https://github.com/lightbend/reactive-streams-utils >> >> Regards, >> >> James >> >> On 22 February 2018 at 11:47, James Roper <ja...@lightbend.com> wrote: >> >> Hi all, >>> >>> This is an email to give people a heads up that we'd like to look at >>> creating an API, in the same vein as the JDK8 Streams API, for building >>> reactive streams (a la JDK9 juc.Flow). Our goals for this are: >>> >>> * To fill a gap in the JDK where if a developer wants to do even the >>> simplest of things with a JDK9 juc.Flow, such as map or filter, they need >>> to bring in a third party library that implements that. >>> * To produce an API that can build Publishers, Subscribers, Processors, >>> and complete graphs, for the purposes of consuming APIs that use reactive >>> streams (for example, JDK9 Http Client). >>> * To produce an API that aligns closely with ju.stream.Stream, using it >>> for inspiration for naming, scope, general API shape, and other aspects. >>> The purpose of this goal is to ensure familiarity of Java developers with >>> the new API, and to limit the number of concepts Java developers need to >>> understand to do the different types of streaming offered by the JDK. >>> * To produce an API that can be implemented by multiple providers >>> (including an RI in the JDK itself), using the ServiceLoader mechanism to >>> provide and load a default implementation (while allowing custom >>> implementations to be manually provided). There are a lot of
Re: Reactive Streams utility API
Hi all, We've put together a simple proposal for this. Please read the README for an introduction to this proposal. https://github.com/lightbend/reactive-streams-utils Regards, James On 22 February 2018 at 11:47, James Roper <ja...@lightbend.com> wrote: > Hi all, > > This is an email to give people a heads up that we'd like to look at > creating an API, in the same vein as the JDK8 Streams API, for building > reactive streams (a la JDK9 juc.Flow). Our goals for this are: > > * To fill a gap in the JDK where if a developer wants to do even the > simplest of things with a JDK9 juc.Flow, such as map or filter, they need > to bring in a third party library that implements that. > * To produce an API that can build Publishers, Subscribers, Processors, > and complete graphs, for the purposes of consuming APIs that use reactive > streams (for example, JDK9 Http Client). > * To produce an API that aligns closely with ju.stream.Stream, using it > for inspiration for naming, scope, general API shape, and other aspects. > The purpose of this goal is to ensure familiarity of Java developers with > the new API, and to limit the number of concepts Java developers need to > understand to do the different types of streaming offered by the JDK. > * To produce an API that can be implemented by multiple providers > (including an RI in the JDK itself), using the ServiceLoader mechanism to > provide and load a default implementation (while allowing custom > implementations to be manually provided). There are a lot of concerns that > each different streams implementation provides and implements, beyond > streaming, for example monitoring/tracing, concurrency modelling, buffering > strategies, performance aspects of the streams handling including fusing, > and context (eg thread local) propagation. This will allow libraries to use > and provide contracts based on this API without depending on a particular > implementation, and allows developers to select the implementation that > meets their needs. > > Non goals: > > * To produce a kitchen sink of utilities for working with reactive > streams. There already exist a number of reactive streams implementations > that seek to meet this goal (eg, Akka Streams, Reactor, RxJava), and once > you go past the basics (map, filter, collect), and start dealing with > things like fan in/out, cycles, restarting, etc, the different approaches > to solving this start to vary greatly. The JDK should provide enough to be > useful for typical every day streaming use cases, with developers being > able to select a third party library for anything more advanced. > > We will update this list when we have something ready for public review. > This probably won't be far off. Our hope is that we can propose this as a > JEP. > > Regards, > > James > > -- > *James Roper* > *Senior Octonaut* > > Lightbend <https://www.lightbend.com/> – Build reactive apps! > Twitter: @jroper <https://twitter.com/jroper> > -- *James Roper* *Senior Octonaut* Lightbend <https://www.lightbend.com/> – Build reactive apps! Twitter: @jroper <https://twitter.com/jroper>
Reactive Streams utility API
Hi all, This is an email to give people a heads up that we'd like to look at creating an API, in the same vein as the JDK8 Streams API, for building reactive streams (a la JDK9 juc.Flow). Our goals for this are: * To fill a gap in the JDK where if a developer wants to do even the simplest of things with a JDK9 juc.Flow, such as map or filter, they need to bring in a third party library that implements that. * To produce an API that can build Publishers, Subscribers, Processors, and complete graphs, for the purposes of consuming APIs that use reactive streams (for example, JDK9 Http Client). * To produce an API that aligns closely with ju.stream.Stream, using it for inspiration for naming, scope, general API shape, and other aspects. The purpose of this goal is to ensure familiarity of Java developers with the new API, and to limit the number of concepts Java developers need to understand to do the different types of streaming offered by the JDK. * To produce an API that can be implemented by multiple providers (including an RI in the JDK itself), using the ServiceLoader mechanism to provide and load a default implementation (while allowing custom implementations to be manually provided). There are a lot of concerns that each different streams implementation provides and implements, beyond streaming, for example monitoring/tracing, concurrency modelling, buffering strategies, performance aspects of the streams handling including fusing, and context (eg thread local) propagation. This will allow libraries to use and provide contracts based on this API without depending on a particular implementation, and allows developers to select the implementation that meets their needs. Non goals: * To produce a kitchen sink of utilities for working with reactive streams. There already exist a number of reactive streams implementations that seek to meet this goal (eg, Akka Streams, Reactor, RxJava), and once you go past the basics (map, filter, collect), and start dealing with things like fan in/out, cycles, restarting, etc, the different approaches to solving this start to vary greatly. The JDK should provide enough to be useful for typical every day streaming use cases, with developers being able to select a third party library for anything more advanced. We will update this list when we have something ready for public review. This probably won't be far off. Our hope is that we can propose this as a JEP. Regards, James -- *James Roper* *Senior Octonaut* Lightbend <https://www.lightbend.com/> – Build reactive apps! Twitter: @jroper <https://twitter.com/jroper>
Re: [concurrency-interest] We need to add blocking methods to CompletionStage!
On 23 September 2016 at 07:27, Martin Buchholz <marti...@google.com> wrote: > Thanks for the lesson, James! > > I took a look at the Scala/Akka/LightBend world. Even there, blocking > always remains a possibility, even if discouraged, e.g. > scala.concurrent.Future extends Awaitable (!), and > http://doc.akka.io/docs/akka/snapshot/general/actor- > systems.html#Blocking_Needs_Careful_Management. > And any general purpose Java API needs to be useful outside of a framework. > Yes absolutely - personally I'm not against putting join on CompletionStage (I'm happy to leave the disagreement of this to Viktor), my main concern regarding CS/CF is encapsulation - if a library performs an asynchronous computation, that library should be able to safely return a future of that (using the term future here in the general sense) to clients and remain in control of it, without worrying about what the clients will do with it - the future should be effectively "immutable" from the clients perspective, that is to say, the client shouldn't be able to change it's behaviour at all. For example, we often cache futures and return them from a libraries API, if a client could cancel a future, that would break everything else that received that future. We'd rather not have to be in a situation where to build a resilient API, we have to "copy" every future before we return it, like APIs currently have to do with the Java collections API. > But of course, the exception is junit tests, in that case, we encourage >> the use of CompletionStage.toCompletableFuture to block. >> > > We're currently fixing jdk9 > CompletableFuture.minimalCompletionStage().toCompletableFuture() > to be awaitable. > > To make toCompletableFuture().join() more reliable as a recommended way to > block, I think we should remove the encouragement to throw UOE from: > http://download.java.net/java/jdk9/docs/api/java/util/ > concurrent/CompletionStage.html#toCompletableFuture-- > > It sounds like the Akka/LightBend World is happy with current > CompletionStage API (may I call this the "actor purist API"?), while other > users will want a bigger, more general CompletableFuture subset for > consuming future values. Frustratingly, some of them will want cancel(), > some not; and we have no good names for any new interfaces; right now all I > can think of is CompletionStage2 and CompletionStage3 !) > > The current implementation of CompletableFuture has a "memory leak > problem" in that e.g. minimalStage().toCompletableFuture().isDone() will > leak a Completion object until the source stage itself is collected. Which > doesn't happen if e.g. a direct isDone() is provided. > JDK-8161600: Garbage retention when source CompletableFutures are never > completed > (but no one has ever complained!) > -- *James Roper* *Software Engineer* Lightbend <https://www.lightbend.com/> – Build reactive apps! Twitter: @jroper <https://twitter.com/jroper>
Re: [concurrency-interest] We need to add blocking methods to CompletionStage!
On 22 September 2016 at 06:43, Martin Buchholz <marti...@google.com> wrote: > What is happening instead is API providers not using CompletionStage as > return values in public APIs because of the lack of convenient blocking, > and instead returning CompletableFuture, which is a tragic software > engineering failure. > Out of interest, which APIs are returning CompletableFuture rather than CompletionStage? In the Play Framework Java API, we have embraced CompletionStage, we use it absolutely everywhere. Likewise in Lagom Framework and the Java API for Akka streams. When it comes to blocking, we strongly advocate never blocking (in the threading models of these projects, blocking on a future will make you very prone to deadlocks). But of course, the exception is junit tests, in that case, we encourage the use of CompletionStage.toCompletableFuture to block. In these projects, you'll generally encounter two different implementations of CompletionStage, one is CompletableFuture, the other is a scala.concurrent.Future backed implementation. Both implement toCompletableFuture. But a user that integrates a third party library with its own CompletionStage implementation of course may encounter issues, that said, I haven't yet seen many (or any?) libraries outside of our ecosystem that are embracing CompletionStage or CompletableFuture in their public APIs yet (this is probably due to my ignorance, not due to there not being any), which is part of why I'm interested in knowing what libraries you know of that are using it. Re-adding join is easy. We discourage CompletionStage.toCompletableFuture > from throwing UnsupportedOperationException, and implement join as: > > public default T join() { return toCompletableFuture().join(); } > > There is a risk of multiple-inheritance conflict with Future if we add > e.g. isDone(), but there are no current plans to turn those Future methods > into default methods, and even if we did in some future release, it would > be only a source, not binary incompatibility, so far less serious. > > ___ > Concurrency-interest mailing list > concurrency-inter...@cs.oswego.edu > http://cs.oswego.edu/mailman/listinfo/concurrency-interest > > -- *James Roper* *Software Engineer* Lightbend <https://www.lightbend.com/> – Build reactive apps! Twitter: @jroper <https://twitter.com/jroper>