Re: [External] : Re: Question about Streams, Gatherers, and fetching too many elements

2024-11-22 Thread David Alayachew
I see your responses in the Reddit thread. Thanks again!

But yes, would love to be kept in the loop about any further developments
on this topic!

On Thu, Nov 21, 2024 at 9:12 AM Viktor Klang 
wrote:

> Hi David,
>
> I might respond to the Reddit thread later, I need to wrap some other
> tasks up first.
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> --
> *From:* David Alayachew 
> *Sent:* Thursday, 21 November 2024 15:01
> *To:* Viktor Klang 
> *Cc:* Rob Spoor ; core-libs-dev <
> core-libs-dev@openjdk.org>
> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
> fetching too many elements
>
>
> Thanks for the context Viktor. I think I got to see that breadth vs depth
> threshold myself when work with danielaveryj on reddit. They explained
> largely in the way that you did, and we experimented with batch sizes until
> we found the cutoff point where things broke off.
>
> But thanks again for the context. If you have more thoughts or you ever do
> make a decision on this, I would love to know!
>
> On Tue, Nov 19, 2024, 7:42 AM Viktor Klang 
> wrote:
>
> Another thing which might add some detail:
>
> This isn't really about short-circuiting under parallelization but rather
> a trade-off between depth-first vs breadth-first processing.
>
> For sequential evaluation you typically want to perform depth-first
> processing since that means that you occupy ~constant space.
>
> For parallel evaluation you *need to* perform [some level of]
> breadth-first processing (since otherwise there is no parallelism). So you
> need to decide what the evaluation strategy will be and where the
> join/merge-points are. For some operations which are inherently
> encounter-ordered you can "place a bet" (think: findFirst() where if you
> have multiple candidates each found in different chunks, you only keep the
> one which has the "earliest" encounter order), but for others, like
> *foldLeft* each subsequent value depends on the previous.
>
> In the *foldLeft* case what Gatherers provide is ofSequential, which
> essentially instructs the processing that there's no use trying to
> parallelize this at all, so under parallel evaluation this stage needs to
> run sequentially—and that means that it must run to completion before
> downstream operations can continue, and by implication this also means that
> the output needs intermittent storage, so you are now in a breadth-first
> scenario.
>
> In my experience, parallel streams are better suited for CPU-bound
> workloads where operations are trivially parallelizable (i.e. no
> encounter-order dependencies). Importantly, it's common to need to have a
> rather large set of elements in the stream to have the overhead of
> parallelization win over sequential evaluation. The only way to know what
> makes sense is to benchmark using realistic workloads.
>
> Personally, I wouldn't be surprised if sequential streams plus the
> occasional Gatherers.mapConcurrent() covers > 90% of all Stream use-cases.
>
> As a side-note, it is important to remember that Java Streams are
> push-style streams. (Push-style vs Pull-style vs Push-Pull-style is a
> longer conversation, but all of these strategies come with trade-offs)
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> ----------
> *From:* core-libs-dev  on behalf of
> Viktor Klang 
> *Sent:* Tuesday, 19 November 2024 13:19
> *To:* David Alayachew 
> *Cc:* Rob Spoor ; core-libs-dev <
> core-libs-dev@openjdk.org>
> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
> fetching too many elements
>
> Hi David,
>
> I've been thinking about this topic for a few days and haven't arrived at
> a satisfactory solution. Keep in mind that this "situation" has been in
> place ever since Streams were released, and perhaps Gatherers may be a
> catalyst to have it be more noticeable—I'll keep thinking about what could
> be done to make it more predictable (besides thinking about what
> rearrangements might make the situation go away).
>
> In general though—short-circuiting in combination with parallelization
> requires a lot of tuning to make sure that the cost of processing more data
> doesn't overtake the benefit of "exiting early".
>
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> --
> *From:* David Alayachew 
> *Sent:* Thursday, 14 November 2024 23:45
> *To:* Viktor Klang 
> *Cc:* Rob Spoor ; core-

Re: [External] : Re: Question about Streams, Gatherers, and fetching too many elements

2024-11-21 Thread David Alayachew
{
  "emoji": "👍",
  "version": 1
}

Re: [External] : Re: Question about Streams, Gatherers, and fetching too many elements

2024-11-21 Thread Viktor Klang
Hi David,

I might respond to the Reddit thread later, I need to wrap some other tasks up 
first.

Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle

From: David Alayachew 
Sent: Thursday, 21 November 2024 15:01
To: Viktor Klang 
Cc: Rob Spoor ; core-libs-dev 
Subject: Re: [External] : Re: Question about Streams, Gatherers, and fetching 
too many elements


Thanks for the context Viktor. I think I got to see that breadth vs depth 
threshold myself when work with danielaveryj on reddit. They explained largely 
in the way that you did, and we experimented with batch sizes until we found 
the cutoff point where things broke off.

But thanks again for the context. If you have more thoughts or you ever do make 
a decision on this, I would love to know!

On Tue, Nov 19, 2024, 7:42 AM Viktor Klang 
mailto:viktor.kl...@oracle.com>> wrote:
Another thing which might add some detail:

This isn't really about short-circuiting under parallelization but rather a 
trade-off between depth-first vs breadth-first processing.

For sequential evaluation you typically want to perform depth-first processing 
since that means that you occupy ~constant space.

For parallel evaluation you need to perform [some level of] breadth-first 
processing (since otherwise there is no parallelism). So you need to decide 
what the evaluation strategy will be and where the join/merge-points are. For 
some operations which are inherently encounter-ordered you can "place a bet" 
(think: findFirst() where if you have multiple candidates each found in 
different chunks, you only keep the one which has the "earliest" encounter 
order), but for others, like foldLeft each subsequent value depends on the 
previous.

In the foldLeft case what Gatherers provide is ofSequential, which essentially 
instructs the processing that there's no use trying to parallelize this at all, 
so under parallel evaluation this stage needs to run sequentially—and that 
means that it must run to completion before downstream operations can continue, 
and by implication this also means that the output needs intermittent storage, 
so you are now in a breadth-first scenario.

In my experience, parallel streams are better suited for CPU-bound workloads 
where operations are trivially parallelizable (i.e. no encounter-order 
dependencies). Importantly, it's common to need to have a rather large set of 
elements in the stream to have the overhead of parallelization win over 
sequential evaluation. The only way to know what makes sense is to benchmark 
using realistic workloads.

Personally, I wouldn't be surprised if sequential streams plus the occasional 
Gatherers.mapConcurrent() covers > 90% of all Stream use-cases.

As a side-note, it is important to remember that Java Streams are push-style 
streams. (Push-style vs Pull-style vs Push-Pull-style is a longer conversation, 
but all of these strategies come with trade-offs)

Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle

From: core-libs-dev 
mailto:core-libs-dev-r...@openjdk.org>> on 
behalf of Viktor Klang mailto:viktor.kl...@oracle.com>>
Sent: Tuesday, 19 November 2024 13:19
To: David Alayachew mailto:davidalayac...@gmail.com>>
Cc: Rob Spoor mailto:open...@icemanx.nl>>; core-libs-dev 
mailto:core-libs-dev@openjdk.org>>
Subject: Re: [External] : Re: Question about Streams, Gatherers, and fetching 
too many elements

Hi David,

I've been thinking about this topic for a few days and haven't arrived at a 
satisfactory solution. Keep in mind that this "situation" has been in place 
ever since Streams were released, and perhaps Gatherers may be a catalyst to 
have it be more noticeable—I'll keep thinking about what could be done to make 
it more predictable (besides thinking about what rearrangements might make the 
situation go away).

In general though—short-circuiting in combination with parallelization requires 
a lot of tuning to make sure that the cost of processing more data doesn't 
overtake the benefit of "exiting early".


Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle

From: David Alayachew 
mailto:davidalayac...@gmail.com>>
Sent: Thursday, 14 November 2024 23:45
To: Viktor Klang mailto:viktor.kl...@oracle.com>>
Cc: Rob Spoor mailto:open...@icemanx.nl>>; core-libs-dev 
mailto:core-libs-dev@openjdk.org>>
Subject: Re: [External] : Re: Question about Streams, Gatherers, and fetching 
too many elements

Oh ok. So it truly is a toss-up depending on each implementation when and where 
this occurs.

Ok, then as my final request, I think even informing the user that this CAN 
occur is worth doing. If nothing else, the user scouring the documentation for 
documentation of this behaviour will know that it is simp

Re: [External] : Re: Question about Streams, Gatherers, and fetching too many elements

2024-11-21 Thread David Alayachew
Thanks for the context Viktor. I think I got to see that breadth vs depth
threshold myself when work with danielaveryj on reddit. They explained
largely in the way that you did, and we experimented with batch sizes until
we found the cutoff point where things broke off.

But thanks again for the context. If you have more thoughts or you ever do
make a decision on this, I would love to know!

On Tue, Nov 19, 2024, 7:42 AM Viktor Klang  wrote:

> Another thing which might add some detail:
>
> This isn't really about short-circuiting under parallelization but rather
> a trade-off between depth-first vs breadth-first processing.
>
> For sequential evaluation you typically want to perform depth-first
> processing since that means that you occupy ~constant space.
>
> For parallel evaluation you *need to* perform [some level of]
> breadth-first processing (since otherwise there is no parallelism). So you
> need to decide what the evaluation strategy will be and where the
> join/merge-points are. For some operations which are inherently
> encounter-ordered you can "place a bet" (think: findFirst() where if you
> have multiple candidates each found in different chunks, you only keep the
> one which has the "earliest" encounter order), but for others, like
> *foldLeft* each subsequent value depends on the previous.
>
> In the *foldLeft* case what Gatherers provide is ofSequential, which
> essentially instructs the processing that there's no use trying to
> parallelize this at all, so under parallel evaluation this stage needs to
> run sequentially—and that means that it must run to completion before
> downstream operations can continue, and by implication this also means that
> the output needs intermittent storage, so you are now in a breadth-first
> scenario.
>
> In my experience, parallel streams are better suited for CPU-bound
> workloads where operations are trivially parallelizable (i.e. no
> encounter-order dependencies). Importantly, it's common to need to have a
> rather large set of elements in the stream to have the overhead of
> parallelization win over sequential evaluation. The only way to know what
> makes sense is to benchmark using realistic workloads.
>
> Personally, I wouldn't be surprised if sequential streams plus the
> occasional Gatherers.mapConcurrent() covers > 90% of all Stream use-cases.
>
> As a side-note, it is important to remember that Java Streams are
> push-style streams. (Push-style vs Pull-style vs Push-Pull-style is a
> longer conversation, but all of these strategies come with trade-offs)
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> --
> *From:* core-libs-dev  on behalf of
> Viktor Klang 
> *Sent:* Tuesday, 19 November 2024 13:19
> *To:* David Alayachew 
> *Cc:* Rob Spoor ; core-libs-dev <
> core-libs-dev@openjdk.org>
> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
> fetching too many elements
>
> Hi David,
>
> I've been thinking about this topic for a few days and haven't arrived at
> a satisfactory solution. Keep in mind that this "situation" has been in
> place ever since Streams were released, and perhaps Gatherers may be a
> catalyst to have it be more noticeable—I'll keep thinking about what could
> be done to make it more predictable (besides thinking about what
> rearrangements might make the situation go away).
>
> In general though—short-circuiting in combination with parallelization
> requires a lot of tuning to make sure that the cost of processing more data
> doesn't overtake the benefit of "exiting early".
>
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> --
> *From:* David Alayachew 
> *Sent:* Thursday, 14 November 2024 23:45
> *To:* Viktor Klang 
> *Cc:* Rob Spoor ; core-libs-dev <
> core-libs-dev@openjdk.org>
> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
> fetching too many elements
>
> Oh ok. So it truly is a toss-up depending on each implementation when and
> where this occurs.
>
> Ok, then as my final request, I think even informing the user that this
> CAN occur is worth doing. If nothing else, the user scouring the
> documentation for documentation of this behaviour will know that it is
> simply something that can occur. They don't need to know all the details.
> Simply give it a official term, describe the behaviour, tell that it is
> implementation specific on when this happens, but that it is only possible
> during parallelism. Even just knowing the verbiage to describe the problem

Re: [External] : Re: Question about Streams, Gatherers, and fetching too many elements

2024-11-19 Thread David Alayachew
Ty vm, I appreciate you digging into this.

On Tue, Nov 19, 2024, 7:19 AM Viktor Klang  wrote:

> Hi David,
>
> I've been thinking about this topic for a few days and haven't arrived at
> a satisfactory solution. Keep in mind that this "situation" has been in
> place ever since Streams were released, and perhaps Gatherers may be a
> catalyst to have it be more noticeable—I'll keep thinking about what could
> be done to make it more predictable (besides thinking about what
> rearrangements might make the situation go away).
>
> In general though—short-circuiting in combination with parallelization
> requires a lot of tuning to make sure that the cost of processing more data
> doesn't overtake the benefit of "exiting early".
>
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> --
> *From:* David Alayachew 
> *Sent:* Thursday, 14 November 2024 23:45
> *To:* Viktor Klang 
> *Cc:* Rob Spoor ; core-libs-dev <
> core-libs-dev@openjdk.org>
> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
> fetching too many elements
>
> Oh ok. So it truly is a toss-up depending on each implementation when and
> where this occurs.
>
> Ok, then as my final request, I think even informing the user that this
> CAN occur is worth doing. If nothing else, the user scouring the
> documentation for documentation of this behaviour will know that it is
> simply something that can occur. They don't need to know all the details.
> Simply give it a official term, describe the behaviour, tell that it is
> implementation specific on when this happens, but that it is only possible
> during parallelism. Even just knowing the verbiage to describe the problem
> will enable them to better communicate with each other on what they want vs
> what they get. That helps searchability, if nothing else.
>
> On Thu, Nov 14, 2024, 8:45 AM Viktor Klang 
> wrote:
>
> I see what you're saying, the problem is that it depends on the Stream
> implementation (given that Stream is an interface).
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> --------------
> *From:* David Alayachew 
> *Sent:* Thursday, 14 November 2024 12:36
> *To:* Viktor Klang 
> *Cc:* Rob Spoor ; core-libs-dev <
> core-libs-dev@openjdk.org>
> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
> fetching too many elements
>
>
> Then let me correct myself again, while simplifying -- I just want that
> detail, that certain combinations might lead to pre-fetching everything, to
> be documented on the stream api. Package level, or on the Stream interface
> itself, seems like a good spot.
>
> On Thu, Nov 14, 2024, 4:22 AM Viktor Klang 
> wrote:
>
> The issue here is that the operation cannot advertise this, as it depends
> on the combination of operations.
>
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> --
> *From:* core-libs-dev  on behalf of David
> Alayachew 
> *Sent:* Wednesday, 13 November 2024 14:07
> *To:* Rob Spoor 
> *Cc:* core-libs-dev 
> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
> fetching too many elements
>
> That is a good point Rob.
>
> Then let me correct myself -- I think the terminal operations don't do a
> great job of advertising whether or not they pre-fetch everything when
> parallelism is activated. Collector fetches as needed. FindAny pre-fetches
> everything. I understand that later releases might change their behaviour,
> but I still want to document the current behaviour in the official javadocs
> so that we can limit undocumented tripping hazards.
>
>
> On Wed, Nov 13, 2024, 7:07 AM Rob Spoor  wrote:
>
> distinct() doesn't require everything to be pulled. It can push elements
> to the downstream as they come along for the first time. When
> downstream.push returns false the gatherer is done.
>
> As part of some experimentation I've implemented all intermediary
> operations using gatherers. Most of them are pretty straightforward and
> will stop integrating once the downstream starts rejecting elements
> (although some use Gatherer.ofSequential to keep it easy). I only found
> two exceptions that don't return the result of downstream.push:
>
> * mapMulti. The downstream.push is passed as the mapper which is a
> Consumer - the return value is ignored. With some more effort it's
> probably possible to capture any false return value and return that from
> the integrator, bu

Re: [External] : Re: Question about Streams, Gatherers, and fetching too many elements

2024-11-19 Thread Viktor Klang
Another thing which might add some detail:

This isn't really about short-circuiting under parallelization but rather a 
trade-off between depth-first vs breadth-first processing.

For sequential evaluation you typically want to perform depth-first processing 
since that means that you occupy ~constant space.

For parallel evaluation you need to perform [some level of] breadth-first 
processing (since otherwise there is no parallelism). So you need to decide 
what the evaluation strategy will be and where the join/merge-points are. For 
some operations which are inherently encounter-ordered you can "place a bet" 
(think: findFirst() where if you have multiple candidates each found in 
different chunks, you only keep the one which has the "earliest" encounter 
order), but for others, like foldLeft each subsequent value depends on the 
previous.

In the foldLeft case what Gatherers provide is ofSequential, which essentially 
instructs the processing that there's no use trying to parallelize this at all, 
so under parallel evaluation this stage needs to run sequentially—and that 
means that it must run to completion before downstream operations can continue, 
and by implication this also means that the output needs intermittent storage, 
so you are now in a breadth-first scenario.

In my experience, parallel streams are better suited for CPU-bound workloads 
where operations are trivially parallelizable (i.e. no encounter-order 
dependencies). Importantly, it's common to need to have a rather large set of 
elements in the stream to have the overhead of parallelization win over 
sequential evaluation. The only way to know what makes sense is to benchmark 
using realistic workloads.

Personally, I wouldn't be surprised if sequential streams plus the occasional 
Gatherers.mapConcurrent() covers > 90% of all Stream use-cases.

As a side-note, it is important to remember that Java Streams are push-style 
streams. (Push-style vs Pull-style vs Push-Pull-style is a longer conversation, 
but all of these strategies come with trade-offs)

Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle

From: core-libs-dev  on behalf of Viktor Klang 

Sent: Tuesday, 19 November 2024 13:19
To: David Alayachew 
Cc: Rob Spoor ; core-libs-dev 
Subject: Re: [External] : Re: Question about Streams, Gatherers, and fetching 
too many elements

Hi David,

I've been thinking about this topic for a few days and haven't arrived at a 
satisfactory solution. Keep in mind that this "situation" has been in place 
ever since Streams were released, and perhaps Gatherers may be a catalyst to 
have it be more noticeable—I'll keep thinking about what could be done to make 
it more predictable (besides thinking about what rearrangements might make the 
situation go away).

In general though—short-circuiting in combination with parallelization requires 
a lot of tuning to make sure that the cost of processing more data doesn't 
overtake the benefit of "exiting early".


Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle

From: David Alayachew 
Sent: Thursday, 14 November 2024 23:45
To: Viktor Klang 
Cc: Rob Spoor ; core-libs-dev 
Subject: Re: [External] : Re: Question about Streams, Gatherers, and fetching 
too many elements

Oh ok. So it truly is a toss-up depending on each implementation when and where 
this occurs.

Ok, then as my final request, I think even informing the user that this CAN 
occur is worth doing. If nothing else, the user scouring the documentation for 
documentation of this behaviour will know that it is simply something that can 
occur. They don't need to know all the details. Simply give it a official term, 
describe the behaviour, tell that it is implementation specific on when this 
happens, but that it is only possible during parallelism. Even just knowing the 
verbiage to describe the problem will enable them to better communicate with 
each other on what they want vs what they get. That helps searchability, if 
nothing else.

On Thu, Nov 14, 2024, 8:45 AM Viktor Klang 
mailto:viktor.kl...@oracle.com>> wrote:
I see what you're saying, the problem is that it depends on the Stream 
implementation (given that Stream is an interface).

Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle

From: David Alayachew 
mailto:davidalayac...@gmail.com>>
Sent: Thursday, 14 November 2024 12:36
To: Viktor Klang mailto:viktor.kl...@oracle.com>>
Cc: Rob Spoor mailto:open...@icemanx.nl>>; core-libs-dev 
mailto:core-libs-dev@openjdk.org>>
Subject: Re: [External] : Re: Question about Streams, Gatherers, and fetching 
too many elements


Then let me correct myself again, while simplifying -- I just want that detail, 
that certain combinations might lead to pre-fetching ever

Re: [External] : Re: Question about Streams, Gatherers, and fetching too many elements

2024-11-19 Thread Viktor Klang
Hi David,

I've been thinking about this topic for a few days and haven't arrived at a 
satisfactory solution. Keep in mind that this "situation" has been in place 
ever since Streams were released, and perhaps Gatherers may be a catalyst to 
have it be more noticeable—I'll keep thinking about what could be done to make 
it more predictable (besides thinking about what rearrangements might make the 
situation go away).

In general though—short-circuiting in combination with parallelization requires 
a lot of tuning to make sure that the cost of processing more data doesn't 
overtake the benefit of "exiting early".


Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle

From: David Alayachew 
Sent: Thursday, 14 November 2024 23:45
To: Viktor Klang 
Cc: Rob Spoor ; core-libs-dev 
Subject: Re: [External] : Re: Question about Streams, Gatherers, and fetching 
too many elements

Oh ok. So it truly is a toss-up depending on each implementation when and where 
this occurs.

Ok, then as my final request, I think even informing the user that this CAN 
occur is worth doing. If nothing else, the user scouring the documentation for 
documentation of this behaviour will know that it is simply something that can 
occur. They don't need to know all the details. Simply give it a official term, 
describe the behaviour, tell that it is implementation specific on when this 
happens, but that it is only possible during parallelism. Even just knowing the 
verbiage to describe the problem will enable them to better communicate with 
each other on what they want vs what they get. That helps searchability, if 
nothing else.

On Thu, Nov 14, 2024, 8:45 AM Viktor Klang 
mailto:viktor.kl...@oracle.com>> wrote:
I see what you're saying, the problem is that it depends on the Stream 
implementation (given that Stream is an interface).

Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle

From: David Alayachew 
mailto:davidalayac...@gmail.com>>
Sent: Thursday, 14 November 2024 12:36
To: Viktor Klang mailto:viktor.kl...@oracle.com>>
Cc: Rob Spoor mailto:open...@icemanx.nl>>; core-libs-dev 
mailto:core-libs-dev@openjdk.org>>
Subject: Re: [External] : Re: Question about Streams, Gatherers, and fetching 
too many elements


Then let me correct myself again, while simplifying -- I just want that detail, 
that certain combinations might lead to pre-fetching everything, to be 
documented on the stream api. Package level, or on the Stream interface itself, 
seems like a good spot.

On Thu, Nov 14, 2024, 4:22 AM Viktor Klang 
mailto:viktor.kl...@oracle.com>> wrote:
The issue here is that the operation cannot advertise this, as it depends on 
the combination of operations.


Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle

From: core-libs-dev 
mailto:core-libs-dev-r...@openjdk.org>> on 
behalf of David Alayachew 
mailto:davidalayac...@gmail.com>>
Sent: Wednesday, 13 November 2024 14:07
To: Rob Spoor mailto:open...@icemanx.nl>>
Cc: core-libs-dev mailto:core-libs-dev@openjdk.org>>
Subject: Re: [External] : Re: Question about Streams, Gatherers, and fetching 
too many elements

That is a good point Rob.

Then let me correct myself -- I think the terminal operations don't do a great 
job of advertising whether or not they pre-fetch everything when parallelism is 
activated. Collector fetches as needed. FindAny pre-fetches everything. I 
understand that later releases might change their behaviour, but I still want 
to document the current behaviour in the official javadocs so that we can limit 
undocumented tripping hazards.


On Wed, Nov 13, 2024, 7:07 AM Rob Spoor 
mailto:open...@icemanx.nl>> wrote:
distinct() doesn't require everything to be pulled. It can push elements
to the downstream as they come along for the first time. When
downstream.push returns false the gatherer is done.

As part of some experimentation I've implemented all intermediary
operations using gatherers. Most of them are pretty straightforward and
will stop integrating once the downstream starts rejecting elements
(although some use Gatherer.ofSequential to keep it easy). I only found
two exceptions that don't return the result of downstream.push:

* mapMulti. The downstream.push is passed as the mapper which is a
Consumer - the return value is ignored. With some more effort it's
probably possible to capture any false return value and return that from
the integrator, but I haven't tried that yet.

* sorted. Obviously every element needs to be inspected.


On 13/11/2024 00:37, David Alayachew wrote:
> Oh sure, I expect something like distinct() to pull everything. In order to
> know if something is distinct, you have to do some variant of "check
> against every

Re: [External] : Re: Question about Streams, Gatherers, and fetching too many elements

2024-11-19 Thread David Alayachew
Poking the thread in case you are able to answer my previous question
Viktor.

Also, another question -- I posted this thread to Reddit, and some good
discussion has already started. Can you or someone else answer some of the
questions that have popped up there?

https://old.reddit.com/r/java/comments/1gukzhb/a_surprising_pain_point_regarding_parallel_java/

Ty vm!

On Thu, Nov 14, 2024 at 5:45 PM David Alayachew 
wrote:

> Oh ok. So it truly is a toss-up depending on each implementation when and
> where this occurs.
>
> Ok, then as my final request, I think even informing the user that this
> CAN occur is worth doing. If nothing else, the user scouring the
> documentation for documentation of this behaviour will know that it is
> simply something that can occur. They don't need to know all the details.
> Simply give it a official term, describe the behaviour, tell that it is
> implementation specific on when this happens, but that it is only possible
> during parallelism. Even just knowing the verbiage to describe the problem
> will enable them to better communicate with each other on what they want vs
> what they get. That helps searchability, if nothing else.
>
> On Thu, Nov 14, 2024, 8:45 AM Viktor Klang 
> wrote:
>
>> I see what you're saying, the problem is that it depends on the Stream
>> implementation (given that Stream is an interface).
>>
>> Cheers,
>> √
>>
>>
>> *Viktor Klang*
>> Software Architect, Java Platform Group
>> Oracle
>> --
>> *From:* David Alayachew 
>> *Sent:* Thursday, 14 November 2024 12:36
>> *To:* Viktor Klang 
>> *Cc:* Rob Spoor ; core-libs-dev <
>> core-libs-dev@openjdk.org>
>> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
>> fetching too many elements
>>
>>
>> Then let me correct myself again, while simplifying -- I just want that
>> detail, that certain combinations might lead to pre-fetching everything, to
>> be documented on the stream api. Package level, or on the Stream interface
>> itself, seems like a good spot.
>>
>> On Thu, Nov 14, 2024, 4:22 AM Viktor Klang 
>> wrote:
>>
>> The issue here is that the operation cannot advertise this, as it depends
>> on the combination of operations.
>>
>>
>> Cheers,
>> √
>>
>>
>> *Viktor Klang*
>> Software Architect, Java Platform Group
>> Oracle
>> --
>> *From:* core-libs-dev  on behalf of
>> David Alayachew 
>> *Sent:* Wednesday, 13 November 2024 14:07
>> *To:* Rob Spoor 
>> *Cc:* core-libs-dev 
>> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
>> fetching too many elements
>>
>> That is a good point Rob.
>>
>> Then let me correct myself -- I think the terminal operations don't do a
>> great job of advertising whether or not they pre-fetch everything when
>> parallelism is activated. Collector fetches as needed. FindAny pre-fetches
>> everything. I understand that later releases might change their behaviour,
>> but I still want to document the current behaviour in the official javadocs
>> so that we can limit undocumented tripping hazards.
>>
>>
>> On Wed, Nov 13, 2024, 7:07 AM Rob Spoor  wrote:
>>
>> distinct() doesn't require everything to be pulled. It can push elements
>> to the downstream as they come along for the first time. When
>> downstream.push returns false the gatherer is done.
>>
>> As part of some experimentation I've implemented all intermediary
>> operations using gatherers. Most of them are pretty straightforward and
>> will stop integrating once the downstream starts rejecting elements
>> (although some use Gatherer.ofSequential to keep it easy). I only found
>> two exceptions that don't return the result of downstream.push:
>>
>> * mapMulti. The downstream.push is passed as the mapper which is a
>> Consumer - the return value is ignored. With some more effort it's
>> probably possible to capture any false return value and return that from
>> the integrator, but I haven't tried that yet.
>>
>> * sorted. Obviously every element needs to be inspected.
>>
>>
>> On 13/11/2024 00:37, David Alayachew wrote:
>> > Oh sure, I expect something like distinct() to pull everything. In
>> order to
>> > know if something is distinct, you have to do some variant of "check
>> > against everyone else". Whether that is holding all instances in memory
>> or
>> > their hashes, it's clear from a glance th

Re: [External] : Re: Question about Streams, Gatherers, and fetching too many elements

2024-11-14 Thread David Alayachew
Oh ok. So it truly is a toss-up depending on each implementation when and
where this occurs.

Ok, then as my final request, I think even informing the user that this CAN
occur is worth doing. If nothing else, the user scouring the documentation
for documentation of this behaviour will know that it is simply something
that can occur. They don't need to know all the details. Simply give it a
official term, describe the behaviour, tell that it is implementation
specific on when this happens, but that it is only possible during
parallelism. Even just knowing the verbiage to describe the problem will
enable them to better communicate with each other on what they want vs what
they get. That helps searchability, if nothing else.

On Thu, Nov 14, 2024, 8:45 AM Viktor Klang  wrote:

> I see what you're saying, the problem is that it depends on the Stream
> implementation (given that Stream is an interface).
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> --
> *From:* David Alayachew 
> *Sent:* Thursday, 14 November 2024 12:36
> *To:* Viktor Klang 
> *Cc:* Rob Spoor ; core-libs-dev <
> core-libs-dev@openjdk.org>
> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
> fetching too many elements
>
>
> Then let me correct myself again, while simplifying -- I just want that
> detail, that certain combinations might lead to pre-fetching everything, to
> be documented on the stream api. Package level, or on the Stream interface
> itself, seems like a good spot.
>
> On Thu, Nov 14, 2024, 4:22 AM Viktor Klang 
> wrote:
>
> The issue here is that the operation cannot advertise this, as it depends
> on the combination of operations.
>
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> --
> *From:* core-libs-dev  on behalf of David
> Alayachew 
> *Sent:* Wednesday, 13 November 2024 14:07
> *To:* Rob Spoor 
> *Cc:* core-libs-dev 
> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
> fetching too many elements
>
> That is a good point Rob.
>
> Then let me correct myself -- I think the terminal operations don't do a
> great job of advertising whether or not they pre-fetch everything when
> parallelism is activated. Collector fetches as needed. FindAny pre-fetches
> everything. I understand that later releases might change their behaviour,
> but I still want to document the current behaviour in the official javadocs
> so that we can limit undocumented tripping hazards.
>
>
> On Wed, Nov 13, 2024, 7:07 AM Rob Spoor  wrote:
>
> distinct() doesn't require everything to be pulled. It can push elements
> to the downstream as they come along for the first time. When
> downstream.push returns false the gatherer is done.
>
> As part of some experimentation I've implemented all intermediary
> operations using gatherers. Most of them are pretty straightforward and
> will stop integrating once the downstream starts rejecting elements
> (although some use Gatherer.ofSequential to keep it easy). I only found
> two exceptions that don't return the result of downstream.push:
>
> * mapMulti. The downstream.push is passed as the mapper which is a
> Consumer - the return value is ignored. With some more effort it's
> probably possible to capture any false return value and return that from
> the integrator, but I haven't tried that yet.
>
> * sorted. Obviously every element needs to be inspected.
>
>
> On 13/11/2024 00:37, David Alayachew wrote:
> > Oh sure, I expect something like distinct() to pull everything. In order
> to
> > know if something is distinct, you have to do some variant of "check
> > against everyone else". Whether that is holding all instances in memory
> or
> > their hashes, it's clear from a glance that you will need to look at
> > everything, and therefore, pre-fetching makes intuitive sense to me.
> >
> > I 100% did not expect terminal operations like findAny() or reduce() to
> > pull the whole data set. That was a complete whiplash for me. The method
> > findAny() advertises itself as a short-circuiting operation, so to find
> out
> > that it actually pulls the whole data set anyways was shocking.
> >
> > And that was my biggest pain point -- looking at the documentation, it is
> > not clear to me at all that methods like findAny() would pull in all data
> > upon becoming parallel().
> >
> > Do you think it would make sense to add documentation about this to the
> > javadocs for Stream/java.util.stream? Or maybe it is already there and I
> > 

Re: [External] : Re: Question about Streams, Gatherers, and fetching too many elements

2024-11-14 Thread Viktor Klang
I see what you're saying, the problem is that it depends on the Stream 
implementation (given that Stream is an interface).

Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle

From: David Alayachew 
Sent: Thursday, 14 November 2024 12:36
To: Viktor Klang 
Cc: Rob Spoor ; core-libs-dev 
Subject: Re: [External] : Re: Question about Streams, Gatherers, and fetching 
too many elements


Then let me correct myself again, while simplifying -- I just want that detail, 
that certain combinations might lead to pre-fetching everything, to be 
documented on the stream api. Package level, or on the Stream interface itself, 
seems like a good spot.

On Thu, Nov 14, 2024, 4:22 AM Viktor Klang 
mailto:viktor.kl...@oracle.com>> wrote:
The issue here is that the operation cannot advertise this, as it depends on 
the combination of operations.


Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle

From: core-libs-dev 
mailto:core-libs-dev-r...@openjdk.org>> on 
behalf of David Alayachew 
mailto:davidalayac...@gmail.com>>
Sent: Wednesday, 13 November 2024 14:07
To: Rob Spoor mailto:open...@icemanx.nl>>
Cc: core-libs-dev mailto:core-libs-dev@openjdk.org>>
Subject: Re: [External] : Re: Question about Streams, Gatherers, and fetching 
too many elements

That is a good point Rob.

Then let me correct myself -- I think the terminal operations don't do a great 
job of advertising whether or not they pre-fetch everything when parallelism is 
activated. Collector fetches as needed. FindAny pre-fetches everything. I 
understand that later releases might change their behaviour, but I still want 
to document the current behaviour in the official javadocs so that we can limit 
undocumented tripping hazards.


On Wed, Nov 13, 2024, 7:07 AM Rob Spoor 
mailto:open...@icemanx.nl>> wrote:
distinct() doesn't require everything to be pulled. It can push elements
to the downstream as they come along for the first time. When
downstream.push returns false the gatherer is done.

As part of some experimentation I've implemented all intermediary
operations using gatherers. Most of them are pretty straightforward and
will stop integrating once the downstream starts rejecting elements
(although some use Gatherer.ofSequential to keep it easy). I only found
two exceptions that don't return the result of downstream.push:

* mapMulti. The downstream.push is passed as the mapper which is a
Consumer - the return value is ignored. With some more effort it's
probably possible to capture any false return value and return that from
the integrator, but I haven't tried that yet.

* sorted. Obviously every element needs to be inspected.


On 13/11/2024 00:37, David Alayachew wrote:
> Oh sure, I expect something like distinct() to pull everything. In order to
> know if something is distinct, you have to do some variant of "check
> against everyone else". Whether that is holding all instances in memory or
> their hashes, it's clear from a glance that you will need to look at
> everything, and therefore, pre-fetching makes intuitive sense to me.
>
> I 100% did not expect terminal operations like findAny() or reduce() to
> pull the whole data set. That was a complete whiplash for me. The method
> findAny() advertises itself as a short-circuiting operation, so to find out
> that it actually pulls the whole data set anyways was shocking.
>
> And that was my biggest pain point -- looking at the documentation, it is
> not clear to me at all that methods like findAny() would pull in all data
> upon becoming parallel().
>
> Do you think it would make sense to add documentation about this to the
> javadocs for Stream/java.util.stream? Or maybe it is already there and I
> misunderstood it (even after reading through it thoroughly over 5 times).
>
>
> On Tue, Nov 12, 2024, 10:06 AM Viktor Klang 
> mailto:viktor.kl...@oracle.com>> wrote:
>
>>> We are told how Streams can process unbounded data sets, but when it
>> tries to do a findAny() with parallel(), it runs into an OOME because it
>> fetched all the data ahead of time. In fact, almost of the terminal
>> operations will hit an OOME in the exact same way if they are parallel and
>> have a big enough data set. It's definitely not the end of the world, but
>> it seems that I have to fit everything into a Collector and/or a Gatherer
>> if I want to avoid pre-fetching everything.
>>
>> Yeah, I think it is important to distinguish "can process unbounded data
>> sets" from "always able to process unbounded data sets".
>>
>> Some operations inherently need the end of the stream, so even something
>> somple like: stream.distinct() or stream.sorted() can end up 

Re: [External] : Re: Question about Streams, Gatherers, and fetching too many elements

2024-11-14 Thread David Alayachew
Then let me correct myself again, while simplifying -- I just want that
detail, that certain combinations might lead to pre-fetching everything, to
be documented on the stream api. Package level, or on the Stream interface
itself, seems like a good spot.

On Thu, Nov 14, 2024, 4:22 AM Viktor Klang  wrote:

> The issue here is that the operation cannot advertise this, as it depends
> on the combination of operations.
>
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> --
> *From:* core-libs-dev  on behalf of David
> Alayachew 
> *Sent:* Wednesday, 13 November 2024 14:07
> *To:* Rob Spoor 
> *Cc:* core-libs-dev 
> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
> fetching too many elements
>
> That is a good point Rob.
>
> Then let me correct myself -- I think the terminal operations don't do a
> great job of advertising whether or not they pre-fetch everything when
> parallelism is activated. Collector fetches as needed. FindAny pre-fetches
> everything. I understand that later releases might change their behaviour,
> but I still want to document the current behaviour in the official javadocs
> so that we can limit undocumented tripping hazards.
>
>
> On Wed, Nov 13, 2024, 7:07 AM Rob Spoor  wrote:
>
> distinct() doesn't require everything to be pulled. It can push elements
> to the downstream as they come along for the first time. When
> downstream.push returns false the gatherer is done.
>
> As part of some experimentation I've implemented all intermediary
> operations using gatherers. Most of them are pretty straightforward and
> will stop integrating once the downstream starts rejecting elements
> (although some use Gatherer.ofSequential to keep it easy). I only found
> two exceptions that don't return the result of downstream.push:
>
> * mapMulti. The downstream.push is passed as the mapper which is a
> Consumer - the return value is ignored. With some more effort it's
> probably possible to capture any false return value and return that from
> the integrator, but I haven't tried that yet.
>
> * sorted. Obviously every element needs to be inspected.
>
>
> On 13/11/2024 00:37, David Alayachew wrote:
> > Oh sure, I expect something like distinct() to pull everything. In order
> to
> > know if something is distinct, you have to do some variant of "check
> > against everyone else". Whether that is holding all instances in memory
> or
> > their hashes, it's clear from a glance that you will need to look at
> > everything, and therefore, pre-fetching makes intuitive sense to me.
> >
> > I 100% did not expect terminal operations like findAny() or reduce() to
> > pull the whole data set. That was a complete whiplash for me. The method
> > findAny() advertises itself as a short-circuiting operation, so to find
> out
> > that it actually pulls the whole data set anyways was shocking.
> >
> > And that was my biggest pain point -- looking at the documentation, it is
> > not clear to me at all that methods like findAny() would pull in all data
> > upon becoming parallel().
> >
> > Do you think it would make sense to add documentation about this to the
> > javadocs for Stream/java.util.stream? Or maybe it is already there and I
> > misunderstood it (even after reading through it thoroughly over 5 times).
> >
> >
> > On Tue, Nov 12, 2024, 10:06 AM Viktor Klang 
> wrote:
> >
> >>> We are told how Streams can process unbounded data sets, but when it
> >> tries to do a findAny() with parallel(), it runs into an OOME because it
> >> fetched all the data ahead of time. In fact, almost of the terminal
> >> operations will hit an OOME in the exact same way if they are parallel
> and
> >> have a big enough data set. It's definitely not the end of the world,
> but
> >> it seems that I have to fit everything into a Collector and/or a
> Gatherer
> >> if I want to avoid pre-fetching everything.
> >>
> >> Yeah, I think it is important to distinguish "can process unbounded data
> >> sets" from "always able to process unbounded data sets".
> >>
> >> Some operations inherently need the end of the stream, so even something
> >> somple like: stream.distinct() or stream.sorted() can end up pulling in
> all
> >> data (which of course won't terminate).
> >>
> >> Fortunately, I think Gatherers can unlock much more situations where
> >> unbounded streams can be processed.
> >>
> >> Cheers,
> >

Re: [External] : Re: Question about Streams, Gatherers, and fetching too many elements

2024-11-14 Thread Viktor Klang
The issue here is that the operation cannot advertise this, as it depends on 
the combination of operations.


Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle

From: core-libs-dev  on behalf of David 
Alayachew 
Sent: Wednesday, 13 November 2024 14:07
To: Rob Spoor 
Cc: core-libs-dev 
Subject: Re: [External] : Re: Question about Streams, Gatherers, and fetching 
too many elements

That is a good point Rob.

Then let me correct myself -- I think the terminal operations don't do a great 
job of advertising whether or not they pre-fetch everything when parallelism is 
activated. Collector fetches as needed. FindAny pre-fetches everything. I 
understand that later releases might change their behaviour, but I still want 
to document the current behaviour in the official javadocs so that we can limit 
undocumented tripping hazards.


On Wed, Nov 13, 2024, 7:07 AM Rob Spoor 
mailto:open...@icemanx.nl>> wrote:
distinct() doesn't require everything to be pulled. It can push elements
to the downstream as they come along for the first time. When
downstream.push returns false the gatherer is done.

As part of some experimentation I've implemented all intermediary
operations using gatherers. Most of them are pretty straightforward and
will stop integrating once the downstream starts rejecting elements
(although some use Gatherer.ofSequential to keep it easy). I only found
two exceptions that don't return the result of downstream.push:

* mapMulti. The downstream.push is passed as the mapper which is a
Consumer - the return value is ignored. With some more effort it's
probably possible to capture any false return value and return that from
the integrator, but I haven't tried that yet.

* sorted. Obviously every element needs to be inspected.


On 13/11/2024 00:37, David Alayachew wrote:
> Oh sure, I expect something like distinct() to pull everything. In order to
> know if something is distinct, you have to do some variant of "check
> against everyone else". Whether that is holding all instances in memory or
> their hashes, it's clear from a glance that you will need to look at
> everything, and therefore, pre-fetching makes intuitive sense to me.
>
> I 100% did not expect terminal operations like findAny() or reduce() to
> pull the whole data set. That was a complete whiplash for me. The method
> findAny() advertises itself as a short-circuiting operation, so to find out
> that it actually pulls the whole data set anyways was shocking.
>
> And that was my biggest pain point -- looking at the documentation, it is
> not clear to me at all that methods like findAny() would pull in all data
> upon becoming parallel().
>
> Do you think it would make sense to add documentation about this to the
> javadocs for Stream/java.util.stream? Or maybe it is already there and I
> misunderstood it (even after reading through it thoroughly over 5 times).
>
>
> On Tue, Nov 12, 2024, 10:06 AM Viktor Klang 
> mailto:viktor.kl...@oracle.com>> wrote:
>
>>> We are told how Streams can process unbounded data sets, but when it
>> tries to do a findAny() with parallel(), it runs into an OOME because it
>> fetched all the data ahead of time. In fact, almost of the terminal
>> operations will hit an OOME in the exact same way if they are parallel and
>> have a big enough data set. It's definitely not the end of the world, but
>> it seems that I have to fit everything into a Collector and/or a Gatherer
>> if I want to avoid pre-fetching everything.
>>
>> Yeah, I think it is important to distinguish "can process unbounded data
>> sets" from "always able to process unbounded data sets".
>>
>> Some operations inherently need the end of the stream, so even something
>> somple like: stream.distinct() or stream.sorted() can end up pulling in all
>> data (which of course won't terminate).
>>
>> Fortunately, I think Gatherers can unlock much more situations where
>> unbounded streams can be processed.
>>
>> Cheers,
>> √
>>
>>
>> *Viktor Klang*
>> Software Architect, Java Platform Group
>> Oracle
>> ----------
>> *From:* David Alayachew 
>> mailto:davidalayac...@gmail.com>>
>> *Sent:* Tuesday, 12 November 2024 15:08
>> *To:* Viktor Klang mailto:viktor.kl...@oracle.com>>
>> *Cc:* core-libs-dev 
>> mailto:core-libs-dev@openjdk.org>>
>> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
>> fetching too many elements
>>
>>
>> Oh woah. I certainly did not. Or rather, I had dismissed the idea as soon
>> as I thought of it.
>>
>>
>> I hand-waved

Re: [External] : Re: Question about Streams, Gatherers, and fetching too many elements

2024-11-13 Thread David Alayachew
That is a good point Rob.

Then let me correct myself -- I think the terminal operations don't do a
great job of advertising whether or not they pre-fetch everything when
parallelism is activated. Collector fetches as needed. FindAny pre-fetches
everything. I understand that later releases might change their behaviour,
but I still want to document the current behaviour in the official javadocs
so that we can limit undocumented tripping hazards.


On Wed, Nov 13, 2024, 7:07 AM Rob Spoor  wrote:

> distinct() doesn't require everything to be pulled. It can push elements
> to the downstream as they come along for the first time. When
> downstream.push returns false the gatherer is done.
>
> As part of some experimentation I've implemented all intermediary
> operations using gatherers. Most of them are pretty straightforward and
> will stop integrating once the downstream starts rejecting elements
> (although some use Gatherer.ofSequential to keep it easy). I only found
> two exceptions that don't return the result of downstream.push:
>
> * mapMulti. The downstream.push is passed as the mapper which is a
> Consumer - the return value is ignored. With some more effort it's
> probably possible to capture any false return value and return that from
> the integrator, but I haven't tried that yet.
>
> * sorted. Obviously every element needs to be inspected.
>
>
> On 13/11/2024 00:37, David Alayachew wrote:
> > Oh sure, I expect something like distinct() to pull everything. In order
> to
> > know if something is distinct, you have to do some variant of "check
> > against everyone else". Whether that is holding all instances in memory
> or
> > their hashes, it's clear from a glance that you will need to look at
> > everything, and therefore, pre-fetching makes intuitive sense to me.
> >
> > I 100% did not expect terminal operations like findAny() or reduce() to
> > pull the whole data set. That was a complete whiplash for me. The method
> > findAny() advertises itself as a short-circuiting operation, so to find
> out
> > that it actually pulls the whole data set anyways was shocking.
> >
> > And that was my biggest pain point -- looking at the documentation, it is
> > not clear to me at all that methods like findAny() would pull in all data
> > upon becoming parallel().
> >
> > Do you think it would make sense to add documentation about this to the
> > javadocs for Stream/java.util.stream? Or maybe it is already there and I
> > misunderstood it (even after reading through it thoroughly over 5 times).
> >
> >
> > On Tue, Nov 12, 2024, 10:06 AM Viktor Klang 
> wrote:
> >
> >>> We are told how Streams can process unbounded data sets, but when it
> >> tries to do a findAny() with parallel(), it runs into an OOME because it
> >> fetched all the data ahead of time. In fact, almost of the terminal
> >> operations will hit an OOME in the exact same way if they are parallel
> and
> >> have a big enough data set. It's definitely not the end of the world,
> but
> >> it seems that I have to fit everything into a Collector and/or a
> Gatherer
> >> if I want to avoid pre-fetching everything.
> >>
> >> Yeah, I think it is important to distinguish "can process unbounded data
> >> sets" from "always able to process unbounded data sets".
> >>
> >> Some operations inherently need the end of the stream, so even something
> >> somple like: stream.distinct() or stream.sorted() can end up pulling in
> all
> >> data (which of course won't terminate).
> >>
> >> Fortunately, I think Gatherers can unlock much more situations where
> >> unbounded streams can be processed.
> >>
> >> Cheers,
> >> √
> >>
> >>
> >> *Viktor Klang*
> >> Software Architect, Java Platform Group
> >> Oracle
> >> --
> >> *From:* David Alayachew 
> >> *Sent:* Tuesday, 12 November 2024 15:08
> >> *To:* Viktor Klang 
> >> *Cc:* core-libs-dev 
> >> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
> >> fetching too many elements
> >>
> >>
> >> Oh woah. I certainly did not. Or rather, I had dismissed the idea as
> soon
> >> as I thought of it.
> >>
> >>
> >> I hand-waved away the idea because I thought that the method would turn
> >> the stream pipeline parallel, thus, recreating the same problem I
> currently
> >> have of parallelism causing all of the elements to be fet

Re: [External] : Re: Question about Streams, Gatherers, and fetching too many elements

2024-11-13 Thread Rob Spoor
distinct() doesn't require everything to be pulled. It can push elements 
to the downstream as they come along for the first time. When 
downstream.push returns false the gatherer is done.


As part of some experimentation I've implemented all intermediary 
operations using gatherers. Most of them are pretty straightforward and 
will stop integrating once the downstream starts rejecting elements 
(although some use Gatherer.ofSequential to keep it easy). I only found 
two exceptions that don't return the result of downstream.push:


* mapMulti. The downstream.push is passed as the mapper which is a 
Consumer - the return value is ignored. With some more effort it's 
probably possible to capture any false return value and return that from 
the integrator, but I haven't tried that yet.


* sorted. Obviously every element needs to be inspected.


On 13/11/2024 00:37, David Alayachew wrote:

Oh sure, I expect something like distinct() to pull everything. In order to
know if something is distinct, you have to do some variant of "check
against everyone else". Whether that is holding all instances in memory or
their hashes, it's clear from a glance that you will need to look at
everything, and therefore, pre-fetching makes intuitive sense to me.

I 100% did not expect terminal operations like findAny() or reduce() to
pull the whole data set. That was a complete whiplash for me. The method
findAny() advertises itself as a short-circuiting operation, so to find out
that it actually pulls the whole data set anyways was shocking.

And that was my biggest pain point -- looking at the documentation, it is
not clear to me at all that methods like findAny() would pull in all data
upon becoming parallel().

Do you think it would make sense to add documentation about this to the
javadocs for Stream/java.util.stream? Or maybe it is already there and I
misunderstood it (even after reading through it thoroughly over 5 times).


On Tue, Nov 12, 2024, 10:06 AM Viktor Klang  wrote:


We are told how Streams can process unbounded data sets, but when it

tries to do a findAny() with parallel(), it runs into an OOME because it
fetched all the data ahead of time. In fact, almost of the terminal
operations will hit an OOME in the exact same way if they are parallel and
have a big enough data set. It's definitely not the end of the world, but
it seems that I have to fit everything into a Collector and/or a Gatherer
if I want to avoid pre-fetching everything.

Yeah, I think it is important to distinguish "can process unbounded data
sets" from "always able to process unbounded data sets".

Some operations inherently need the end of the stream, so even something
somple like: stream.distinct() or stream.sorted() can end up pulling in all
data (which of course won't terminate).

Fortunately, I think Gatherers can unlock much more situations where
unbounded streams can be processed.

Cheers,
√


*Viktor Klang*
Software Architect, Java Platform Group
Oracle
--
*From:* David Alayachew 
*Sent:* Tuesday, 12 November 2024 15:08
*To:* Viktor Klang 
*Cc:* core-libs-dev 
*Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
fetching too many elements


Oh woah. I certainly did not. Or rather, I had dismissed the idea as soon
as I thought of it.


I hand-waved away the idea because I thought that the method would turn
the stream pipeline parallel, thus, recreating the same problem I currently
have of parallelism causing all of the elements to be fetched ahead of
time, causing an OOME.


It did NOT occur to me that the pipeline would stay sequential, and just
kick these off sequentially, but have them executing in parallel. I can't
see why I came to that incorrect conclusion. I have read the javadocs of
this method several times. Though, to be fair, I came to the same,
incorrect conclusion about Collectors.groupingByConcurrent(), and it wasn't
until someone pointed out what the documentation was actually saying that I
realized it's true properties.

Thanks. That definitely solves at least part of my problem. Obviously, I
would prefer to write to S3 in parallel too, but at the very least, the
calculation part is being done in parallel. And worst case scenario, I can
be really bad and just do the write to S3 in the mapConcurrent, and then
just return the metadata of each write, and just bundle that up with
collect.


And that's ignoring the fact that I can just use the workaround too.


Yeah, the whole "pre-fetch all the data ahead of time" makes sense to me
from a performance perspective, but is rather unintuitive to me from a
usability perspective. We are told how Streams can process unbounded data
sets, but when it tries to do a findAny() with parallel(), it runs into an
OOME because it fetched all the data ahead of time. In fact, almost of the
terminal operations will hit an OOME in the exact same way i

Re: [External] : Re: Question about Streams, Gatherers, and fetching too many elements

2024-11-13 Thread David Alayachew
Makes a lot of sense. And I agree -- hopefully we do end up in a place
where all the methods on Stream do execute on the same island.

Until then, I will make do with keeping as much functionality as possible
on the Gatherers and Collectors, so that I can eliminate island hopping
entirely. At the very least, I will avoid using the non-Collector terminal
operations, as it appears that, using a Collector keeps everything on the
same island for the duration of the stream.


On Wed, Nov 13, 2024, 5:00 AM Viktor Klang  wrote:

> I think the problem is that it depends on the order, and combination, of
> operations to know what executes in the same "island".
>
> My personal preference would try to end up in a place where an entire
> pipeline is executed as a single island, which would mean that
> short-circuit signals would always propagate right back to the source.
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> --
> *From:* David Alayachew 
> *Sent:* Wednesday, 13 November 2024 00:37
> *To:* Viktor Klang 
> *Cc:* core-libs-dev 
> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
> fetching too many elements
>
> Oh sure, I expect something like distinct() to pull everything. In order
> to know if something is distinct, you have to do some variant of "check
> against everyone else". Whether that is holding all instances in memory or
> their hashes, it's clear from a glance that you will need to look at
> everything, and therefore, pre-fetching makes intuitive sense to me.
>
> I 100% did not expect terminal operations like findAny() or reduce() to
> pull the whole data set. That was a complete whiplash for me. The method
> findAny() advertises itself as a short-circuiting operation, so to find out
> that it actually pulls the whole data set anyways was shocking.
>
> And that was my biggest pain point -- looking at the documentation, it is
> not clear to me at all that methods like findAny() would pull in all data
> upon becoming parallel().
>
> Do you think it would make sense to add documentation about this to the
> javadocs for Stream/java.util.stream? Or maybe it is already there and I
> misunderstood it (even after reading through it thoroughly over 5 times).
>
>
> On Tue, Nov 12, 2024, 10:06 AM Viktor Klang 
> wrote:
>
> >We are told how Streams can process unbounded data sets, but when it
> tries to do a findAny() with parallel(), it runs into an OOME because it
> fetched all the data ahead of time. In fact, almost of the terminal
> operations will hit an OOME in the exact same way if they are parallel and
> have a big enough data set. It's definitely not the end of the world, but
> it seems that I have to fit everything into a Collector and/or a Gatherer
> if I want to avoid pre-fetching everything.
>
> Yeah, I think it is important to distinguish "can process unbounded data
> sets" from "always able to process unbounded data sets".
>
> Some operations inherently need the end of the stream, so even something
> somple like: stream.distinct() or stream.sorted() can end up pulling in all
> data (which of course won't terminate).
>
> Fortunately, I think Gatherers can unlock much more situations where
> unbounded streams can be processed.
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> --
> *From:* David Alayachew 
> *Sent:* Tuesday, 12 November 2024 15:08
> *To:* Viktor Klang 
> *Cc:* core-libs-dev 
> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
> fetching too many elements
>
>
> Oh woah. I certainly did not. Or rather, I had dismissed the idea as soon
> as I thought of it.
>
>
> I hand-waved away the idea because I thought that the method would turn
> the stream pipeline parallel, thus, recreating the same problem I currently
> have of parallelism causing all of the elements to be fetched ahead of
> time, causing an OOME.
>
>
> It did NOT occur to me that the pipeline would stay sequential, and just
> kick these off sequentially, but have them executing in parallel. I can't
> see why I came to that incorrect conclusion. I have read the javadocs of
> this method several times. Though, to be fair, I came to the same,
> incorrect conclusion about Collectors.groupingByConcurrent(), and it wasn't
> until someone pointed out what the documentation was actually saying that I
> realized it's true properties.
>
> Thanks. That definitely solves at least part of my problem. Obviously, I
> would prefer to write to S3 in parallel too, but at the very least, the
> calcul

Re: [External] : Re: Question about Streams, Gatherers, and fetching too many elements

2024-11-13 Thread Viktor Klang
I think the problem is that it depends on the order, and combination, of 
operations to know what executes in the same "island".

My personal preference would try to end up in a place where an entire pipeline 
is executed as a single island, which would mean that short-circuit signals 
would always propagate right back to the source.

Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle

From: David Alayachew 
Sent: Wednesday, 13 November 2024 00:37
To: Viktor Klang 
Cc: core-libs-dev 
Subject: Re: [External] : Re: Question about Streams, Gatherers, and fetching 
too many elements

Oh sure, I expect something like distinct() to pull everything. In order to 
know if something is distinct, you have to do some variant of "check against 
everyone else". Whether that is holding all instances in memory or their 
hashes, it's clear from a glance that you will need to look at everything, and 
therefore, pre-fetching makes intuitive sense to me.

I 100% did not expect terminal operations like findAny() or reduce() to pull 
the whole data set. That was a complete whiplash for me. The method findAny() 
advertises itself as a short-circuiting operation, so to find out that it 
actually pulls the whole data set anyways was shocking.

And that was my biggest pain point -- looking at the documentation, it is not 
clear to me at all that methods like findAny() would pull in all data upon 
becoming parallel().

Do you think it would make sense to add documentation about this to the 
javadocs for Stream/java.util.stream? Or maybe it is already there and I 
misunderstood it (even after reading through it thoroughly over 5 times).


On Tue, Nov 12, 2024, 10:06 AM Viktor Klang 
mailto:viktor.kl...@oracle.com>> wrote:
>We are told how Streams can process unbounded data sets, but when it tries to 
>do a findAny() with parallel(), it runs into an OOME because it fetched all 
>the data ahead of time. In fact, almost of the terminal operations will hit an 
>OOME in the exact same way if they are parallel and have a big enough data 
>set. It's definitely not the end of the world, but it seems that I have to fit 
>everything into a Collector and/or a Gatherer if I want to avoid pre-fetching 
>everything.

Yeah, I think it is important to distinguish "can process unbounded data sets" 
from "always able to process unbounded data sets".

Some operations inherently need the end of the stream, so even something somple 
like: stream.distinct() or stream.sorted() can end up pulling in all data 
(which of course won't terminate).

Fortunately, I think Gatherers can unlock much more situations where unbounded 
streams can be processed.

Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle

From: David Alayachew 
mailto:davidalayac...@gmail.com>>
Sent: Tuesday, 12 November 2024 15:08
To: Viktor Klang mailto:viktor.kl...@oracle.com>>
Cc: core-libs-dev mailto:core-libs-dev@openjdk.org>>
Subject: Re: [External] : Re: Question about Streams, Gatherers, and fetching 
too many elements


Oh woah. I certainly did not. Or rather, I had dismissed the idea as soon as I 
thought of it.


I hand-waved away the idea because I thought that the method would turn the 
stream pipeline parallel, thus, recreating the same problem I currently have of 
parallelism causing all of the elements to be fetched ahead of time, causing an 
OOME.


It did NOT occur to me that the pipeline would stay sequential, and just kick 
these off sequentially, but have them executing in parallel. I can't see why I 
came to that incorrect conclusion. I have read the javadocs of this method 
several times. Though, to be fair, I came to the same, incorrect conclusion 
about Collectors.groupingByConcurrent(), and it wasn't until someone pointed 
out what the documentation was actually saying that I realized it's true 
properties.

Thanks. That definitely solves at least part of my problem. Obviously, I would 
prefer to write to S3 in parallel too, but at the very least, the calculation 
part is being done in parallel. And worst case scenario, I can be really bad 
and just do the write to S3 in the mapConcurrent, and then just return the 
metadata of each write, and just bundle that up with collect.


And that's ignoring the fact that I can just use the workaround too.


Yeah, the whole "pre-fetch all the data ahead of time" makes sense to me from a 
performance perspective, but is rather unintuitive to me from a usability 
perspective. We are told how Streams can process unbounded data sets, but when 
it tries to do a findAny() with parallel(), it runs into an OOME because it 
fetched all the data ahead of time. In fact, almost of the terminal operations 
will hit an OOME in the exact same way if they are parallel and have a big 
enough data set. It's

Re: [External] : Re: Question about Streams, Gatherers, and fetching too many elements

2024-11-12 Thread David Alayachew
Apologies, I did not mean to add reduce(). Please ignore that part.

On Tue, Nov 12, 2024, 6:37 PM David Alayachew 
wrote:

> Oh sure, I expect something like distinct() to pull everything. In order
> to know if something is distinct, you have to do some variant of "check
> against everyone else". Whether that is holding all instances in memory or
> their hashes, it's clear from a glance that you will need to look at
> everything, and therefore, pre-fetching makes intuitive sense to me.
>
> I 100% did not expect terminal operations like findAny() or reduce() to
> pull the whole data set. That was a complete whiplash for me. The method
> findAny() advertises itself as a short-circuiting operation, so to find out
> that it actually pulls the whole data set anyways was shocking.
>
> And that was my biggest pain point -- looking at the documentation, it is
> not clear to me at all that methods like findAny() would pull in all data
> upon becoming parallel().
>
> Do you think it would make sense to add documentation about this to the
> javadocs for Stream/java.util.stream? Or maybe it is already there and I
> misunderstood it (even after reading through it thoroughly over 5 times).
>
>
> On Tue, Nov 12, 2024, 10:06 AM Viktor Klang 
> wrote:
>
>> >We are told how Streams can process unbounded data sets, but when it
>> tries to do a findAny() with parallel(), it runs into an OOME because it
>> fetched all the data ahead of time. In fact, almost of the terminal
>> operations will hit an OOME in the exact same way if they are parallel and
>> have a big enough data set. It's definitely not the end of the world, but
>> it seems that I have to fit everything into a Collector and/or a Gatherer
>> if I want to avoid pre-fetching everything.
>>
>> Yeah, I think it is important to distinguish "can process unbounded data
>> sets" from "always able to process unbounded data sets".
>>
>> Some operations inherently need the end of the stream, so even something
>> somple like: stream.distinct() or stream.sorted() can end up pulling in all
>> data (which of course won't terminate).
>>
>> Fortunately, I think Gatherers can unlock much more situations where
>> unbounded streams can be processed.
>>
>> Cheers,
>> √
>>
>>
>> *Viktor Klang*
>> Software Architect, Java Platform Group
>> Oracle
>> --
>> *From:* David Alayachew 
>> *Sent:* Tuesday, 12 November 2024 15:08
>> *To:* Viktor Klang 
>> *Cc:* core-libs-dev 
>> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
>> fetching too many elements
>>
>>
>> Oh woah. I certainly did not. Or rather, I had dismissed the idea as soon
>> as I thought of it.
>>
>>
>> I hand-waved away the idea because I thought that the method would turn
>> the stream pipeline parallel, thus, recreating the same problem I currently
>> have of parallelism causing all of the elements to be fetched ahead of
>> time, causing an OOME.
>>
>>
>> It did NOT occur to me that the pipeline would stay sequential, and just
>> kick these off sequentially, but have them executing in parallel. I can't
>> see why I came to that incorrect conclusion. I have read the javadocs of
>> this method several times. Though, to be fair, I came to the same,
>> incorrect conclusion about Collectors.groupingByConcurrent(), and it wasn't
>> until someone pointed out what the documentation was actually saying that I
>> realized it's true properties.
>>
>> Thanks. That definitely solves at least part of my problem. Obviously, I
>> would prefer to write to S3 in parallel too, but at the very least, the
>> calculation part is being done in parallel. And worst case scenario, I can
>> be really bad and just do the write to S3 in the mapConcurrent, and then
>> just return the metadata of each write, and just bundle that up with
>> collect.
>>
>>
>> And that's ignoring the fact that I can just use the workaround too.
>>
>>
>> Yeah, the whole "pre-fetch all the data ahead of time" makes sense to me
>> from a performance perspective, but is rather unintuitive to me from a
>> usability perspective. We are told how Streams can process unbounded data
>> sets, but when it tries to do a findAny() with parallel(), it runs into an
>> OOME because it fetched all the data ahead of time. In fact, almost of the
>> terminal operations will hit an OOME in the exact same way if they are
>> parallel and have a big enough data set. It's definitely not 

Re: [External] : Re: Question about Streams, Gatherers, and fetching too many elements

2024-11-12 Thread David Alayachew
Oh sure, I expect something like distinct() to pull everything. In order to
know if something is distinct, you have to do some variant of "check
against everyone else". Whether that is holding all instances in memory or
their hashes, it's clear from a glance that you will need to look at
everything, and therefore, pre-fetching makes intuitive sense to me.

I 100% did not expect terminal operations like findAny() or reduce() to
pull the whole data set. That was a complete whiplash for me. The method
findAny() advertises itself as a short-circuiting operation, so to find out
that it actually pulls the whole data set anyways was shocking.

And that was my biggest pain point -- looking at the documentation, it is
not clear to me at all that methods like findAny() would pull in all data
upon becoming parallel().

Do you think it would make sense to add documentation about this to the
javadocs for Stream/java.util.stream? Or maybe it is already there and I
misunderstood it (even after reading through it thoroughly over 5 times).


On Tue, Nov 12, 2024, 10:06 AM Viktor Klang  wrote:

> >We are told how Streams can process unbounded data sets, but when it
> tries to do a findAny() with parallel(), it runs into an OOME because it
> fetched all the data ahead of time. In fact, almost of the terminal
> operations will hit an OOME in the exact same way if they are parallel and
> have a big enough data set. It's definitely not the end of the world, but
> it seems that I have to fit everything into a Collector and/or a Gatherer
> if I want to avoid pre-fetching everything.
>
> Yeah, I think it is important to distinguish "can process unbounded data
> sets" from "always able to process unbounded data sets".
>
> Some operations inherently need the end of the stream, so even something
> somple like: stream.distinct() or stream.sorted() can end up pulling in all
> data (which of course won't terminate).
>
> Fortunately, I think Gatherers can unlock much more situations where
> unbounded streams can be processed.
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> --
> *From:* David Alayachew 
> *Sent:* Tuesday, 12 November 2024 15:08
> *To:* Viktor Klang 
> *Cc:* core-libs-dev 
> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
> fetching too many elements
>
>
> Oh woah. I certainly did not. Or rather, I had dismissed the idea as soon
> as I thought of it.
>
>
> I hand-waved away the idea because I thought that the method would turn
> the stream pipeline parallel, thus, recreating the same problem I currently
> have of parallelism causing all of the elements to be fetched ahead of
> time, causing an OOME.
>
>
> It did NOT occur to me that the pipeline would stay sequential, and just
> kick these off sequentially, but have them executing in parallel. I can't
> see why I came to that incorrect conclusion. I have read the javadocs of
> this method several times. Though, to be fair, I came to the same,
> incorrect conclusion about Collectors.groupingByConcurrent(), and it wasn't
> until someone pointed out what the documentation was actually saying that I
> realized it's true properties.
>
> Thanks. That definitely solves at least part of my problem. Obviously, I
> would prefer to write to S3 in parallel too, but at the very least, the
> calculation part is being done in parallel. And worst case scenario, I can
> be really bad and just do the write to S3 in the mapConcurrent, and then
> just return the metadata of each write, and just bundle that up with
> collect.
>
>
> And that's ignoring the fact that I can just use the workaround too.
>
>
> Yeah, the whole "pre-fetch all the data ahead of time" makes sense to me
> from a performance perspective, but is rather unintuitive to me from a
> usability perspective. We are told how Streams can process unbounded data
> sets, but when it tries to do a findAny() with parallel(), it runs into an
> OOME because it fetched all the data ahead of time. In fact, almost of the
> terminal operations will hit an OOME in the exact same way if they are
> parallel and have a big enough data set. It's definitely not the end of the
> world, but it seems that I have to fit everything into a Collector and/or a
> Gatherer if I want to avoid pre-fetching everything.
>
> On Tue, Nov 12, 2024, 6:36 AM Viktor Klang 
> wrote:
>
> Have you considered Gatherers.mapConcurrent(…)?
>
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> --
> *From:* David Alayachew 
> *Sent:* Tuesday, 12 November 2024 01:53
>

Re: [External] : Re: Question about Streams, Gatherers, and fetching too many elements

2024-11-12 Thread Viktor Klang
>We are told how Streams can process unbounded data sets, but when it tries to 
>do a findAny() with parallel(), it runs into an OOME because it fetched all 
>the data ahead of time. In fact, almost of the terminal operations will hit an 
>OOME in the exact same way if they are parallel and have a big enough data 
>set. It's definitely not the end of the world, but it seems that I have to fit 
>everything into a Collector and/or a Gatherer if I want to avoid pre-fetching 
>everything.

Yeah, I think it is important to distinguish "can process unbounded data sets" 
from "always able to process unbounded data sets".

Some operations inherently need the end of the stream, so even something somple 
like: stream.distinct() or stream.sorted() can end up pulling in all data 
(which of course won't terminate).

Fortunately, I think Gatherers can unlock much more situations where unbounded 
streams can be processed.

Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle

From: David Alayachew 
Sent: Tuesday, 12 November 2024 15:08
To: Viktor Klang 
Cc: core-libs-dev 
Subject: Re: [External] : Re: Question about Streams, Gatherers, and fetching 
too many elements


Oh woah. I certainly did not. Or rather, I had dismissed the idea as soon as I 
thought of it.


I hand-waved away the idea because I thought that the method would turn the 
stream pipeline parallel, thus, recreating the same problem I currently have of 
parallelism causing all of the elements to be fetched ahead of time, causing an 
OOME.


It did NOT occur to me that the pipeline would stay sequential, and just kick 
these off sequentially, but have them executing in parallel. I can't see why I 
came to that incorrect conclusion. I have read the javadocs of this method 
several times. Though, to be fair, I came to the same, incorrect conclusion 
about Collectors.groupingByConcurrent(), and it wasn't until someone pointed 
out what the documentation was actually saying that I realized it's true 
properties.

Thanks. That definitely solves at least part of my problem. Obviously, I would 
prefer to write to S3 in parallel too, but at the very least, the calculation 
part is being done in parallel. And worst case scenario, I can be really bad 
and just do the write to S3 in the mapConcurrent, and then just return the 
metadata of each write, and just bundle that up with collect.


And that's ignoring the fact that I can just use the workaround too.


Yeah, the whole "pre-fetch all the data ahead of time" makes sense to me from a 
performance perspective, but is rather unintuitive to me from a usability 
perspective. We are told how Streams can process unbounded data sets, but when 
it tries to do a findAny() with parallel(), it runs into an OOME because it 
fetched all the data ahead of time. In fact, almost of the terminal operations 
will hit an OOME in the exact same way if they are parallel and have a big 
enough data set. It's definitely not the end of the world, but it seems that I 
have to fit everything into a Collector and/or a Gatherer if I want to avoid 
pre-fetching everything.

On Tue, Nov 12, 2024, 6:36 AM Viktor Klang 
mailto:viktor.kl...@oracle.com>> wrote:
Have you considered Gatherers.mapConcurrent(…)?


Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle

From: David Alayachew 
mailto:davidalayac...@gmail.com>>
Sent: Tuesday, 12 November 2024 01:53
To: Viktor Klang mailto:viktor.kl...@oracle.com>>
Cc: core-libs-dev mailto:core-libs-dev@openjdk.org>>
Subject: Re: [External] : Re: Question about Streams, Gatherers, and fetching 
too many elements

Good to know, ty vm.

At the very least, I have this workaround. This will meet my needs for now.

I guess my final question would be -- is this type of problem better suited to 
something besides parallel streams? Maybe an ExecutorService?

Really, all I am doing is taking a jumbo file, splitting it into batches, and 
then doing some work on those batches. My IO speeds are pretty fast, and the 
compute work is non-trivial, so there is performance being left on the table if 
I give up parallelism. And I am in a position where completion time is very 
important to us.

I just naturally assumed parallel streams were the right choice because the 
compute work is simple. A pure function that I can break out, and then call in 
a map. Once I do that, I just call forEach to write the batches back out to S3. 
Maybe I should look into a different part of the std lib instead because I am 
using the wrong tool for the job? My nose says ExecutorService, but I figure I 
should ask before I dive too deep in.


On Mon, Nov 11, 2024, 2:34 PM Viktor Klang 
mailto:viktor.kl...@oracle.com>> wrote:
You're most welcome!

In a potential future where all intermediate operations are Gatherer-based, 

Re: [External] : Re: Question about Streams, Gatherers, and fetching too many elements

2024-11-12 Thread David Alayachew
Oh woah. I certainly did not. Or rather, I had dismissed the idea as soon
as I thought of it.


I hand-waved away the idea because I thought that the method would turn the
stream pipeline parallel, thus, recreating the same problem I currently
have of parallelism causing all of the elements to be fetched ahead of
time, causing an OOME.


It did NOT occur to me that the pipeline would stay sequential, and just
kick these off sequentially, but have them executing in parallel. I can't
see why I came to that incorrect conclusion. I have read the javadocs of
this method several times. Though, to be fair, I came to the same,
incorrect conclusion about Collectors.groupingByConcurrent(), and it wasn't
until someone pointed out what the documentation was actually saying that I
realized it's true properties.

Thanks. That definitely solves at least part of my problem. Obviously, I
would prefer to write to S3 in parallel too, but at the very least, the
calculation part is being done in parallel. And worst case scenario, I can
be really bad and just do the write to S3 in the mapConcurrent, and then
just return the metadata of each write, and just bundle that up with
collect.


And that's ignoring the fact that I can just use the workaround too.


Yeah, the whole "pre-fetch all the data ahead of time" makes sense to me
from a performance perspective, but is rather unintuitive to me from a
usability perspective. We are told how Streams can process unbounded data
sets, but when it tries to do a findAny() with parallel(), it runs into an
OOME because it fetched all the data ahead of time. In fact, almost of the
terminal operations will hit an OOME in the exact same way if they are
parallel and have a big enough data set. It's definitely not the end of the
world, but it seems that I have to fit everything into a Collector and/or a
Gatherer if I want to avoid pre-fetching everything.

On Tue, Nov 12, 2024, 6:36 AM Viktor Klang  wrote:

> Have you considered Gatherers.mapConcurrent(…)?
>
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> --
> *From:* David Alayachew 
> *Sent:* Tuesday, 12 November 2024 01:53
> *To:* Viktor Klang 
> *Cc:* core-libs-dev 
> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
> fetching too many elements
>
> Good to know, ty vm.
>
> At the very least, I have this workaround. This will meet my needs for now.
>
> I guess my final question would be -- is this type of problem better
> suited to something besides parallel streams? Maybe an ExecutorService?
>
> Really, all I am doing is taking a jumbo file, splitting it into batches,
> and then doing some work on those batches. My IO speeds are pretty fast,
> and the compute work is non-trivial, so there is performance being left on
> the table if I give up parallelism. And I am in a position where completion
> time is very important to us.
>
> I just naturally assumed parallel streams were the right choice because
> the compute work is simple. A pure function that I can break out, and then
> call in a map. Once I do that, I just call forEach to write the batches
> back out to S3. Maybe I should look into a different part of the std lib
> instead because I am using the wrong tool for the job? My nose says
> ExecutorService, but I figure I should ask before I dive too deep in.
>
>
> On Mon, Nov 11, 2024, 2:34 PM Viktor Klang 
> wrote:
>
> You're most welcome!
>
> In a potential future where all intermediate operations are
> Gatherer-based, and all terminal operations are Collector-based, it would
> just work as expected. But with that said, I'm not sure it is practically
> achievable because some operations might not have the same
> performance-characteristics as before.
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> --
> *From:* David Alayachew 
> *Sent:* Monday, 11 November 2024 18:32
> *To:* Viktor Klang 
> *Cc:* core-libs-dev 
> *Subject:* [External] : Re: Question about Streams, Gatherers, and
> fetching too many elements
>
>
> Thanks for the workaround. It's running beautifully.
>
> Is there a future where this island concept is extended to the rest of
> streams? Tbh, I don't fully understand it.
>
> On Mon, Nov 11, 2024, 9:59 AM Viktor Klang 
> wrote:
>
> Hi David,
>
> This is the effect of how parallel streams are implemented, where
> different stages, which are not representible as a join-less Spliterator
> are executed as a series of "islands" where the next isn't started until
> the former has completed.
>
> If you think about it, parallelization of a Stream works be

Re: [External] : Re: Question about Streams, Gatherers, and fetching too many elements

2024-11-12 Thread Viktor Klang
Have you considered Gatherers.mapConcurrent(…)?


Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle

From: David Alayachew 
Sent: Tuesday, 12 November 2024 01:53
To: Viktor Klang 
Cc: core-libs-dev 
Subject: Re: [External] : Re: Question about Streams, Gatherers, and fetching 
too many elements

Good to know, ty vm.

At the very least, I have this workaround. This will meet my needs for now.

I guess my final question would be -- is this type of problem better suited to 
something besides parallel streams? Maybe an ExecutorService?

Really, all I am doing is taking a jumbo file, splitting it into batches, and 
then doing some work on those batches. My IO speeds are pretty fast, and the 
compute work is non-trivial, so there is performance being left on the table if 
I give up parallelism. And I am in a position where completion time is very 
important to us.

I just naturally assumed parallel streams were the right choice because the 
compute work is simple. A pure function that I can break out, and then call in 
a map. Once I do that, I just call forEach to write the batches back out to S3. 
Maybe I should look into a different part of the std lib instead because I am 
using the wrong tool for the job? My nose says ExecutorService, but I figure I 
should ask before I dive too deep in.


On Mon, Nov 11, 2024, 2:34 PM Viktor Klang 
mailto:viktor.kl...@oracle.com>> wrote:
You're most welcome!

In a potential future where all intermediate operations are Gatherer-based, and 
all terminal operations are Collector-based, it would just work as expected. 
But with that said, I'm not sure it is practically achievable because some 
operations might not have the same performance-characteristics as before.

Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle

From: David Alayachew 
mailto:davidalayac...@gmail.com>>
Sent: Monday, 11 November 2024 18:32
To: Viktor Klang mailto:viktor.kl...@oracle.com>>
Cc: core-libs-dev mailto:core-libs-dev@openjdk.org>>
Subject: [External] : Re: Question about Streams, Gatherers, and fetching too 
many elements


Thanks for the workaround. It's running beautifully.

Is there a future where this island concept is extended to the rest of streams? 
Tbh, I don't fully understand it.

On Mon, Nov 11, 2024, 9:59 AM Viktor Klang 
mailto:viktor.kl...@oracle.com>> wrote:
Hi David,

This is the effect of how parallel streams are implemented, where different 
stages, which are not representible as a join-less Spliterator are executed as 
a series of "islands" where the next isn't started until the former has 
completed.

If you think about it, parallelization of a Stream works best when the entire 
data set can be split amongst a set of worker threads, and that sort of implies 
that you want eager pre-fetch of data, so if your dataset does not fit in 
memory, that is likely to lead to less desirable outcomes.

What I was able to do for Gatherers is to implement "gather(…) + 
collect(…)"-fusion so any number of consecutive gather(…)-operations 
immediately followed by a collect(…) is run in the same "island".

So with that said, you could try something like the following:

static  Collector forEach(Consumer each) {
return Collector.of(() -> null, (v, e) -> each.accept(e), (l, r) -> l, (v) 
-> null, Collector.Characteristics.IDENTITY_FINISH);
}


stream
.parallel()
.unordered()
.gather(Gatherers.windowFixed(BATCH_SIZE))
.collect(forEach(eachList -> println(eachList.getFirst(;


Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle

From: core-libs-dev 
mailto:core-libs-dev-r...@openjdk.org>> on 
behalf of David Alayachew 
mailto:davidalayac...@gmail.com>>
Sent: Monday, 11 November 2024 14:52
To: core-libs-dev mailto:core-libs-dev@openjdk.org>>
Subject: Re: Question about Streams, Gatherers, and fetching too many elements

And just to avoid the obvious question, I can hold about 30 batches in memory 
before the Out of Memory error occurs. So this is not an issue of my batch size 
being too high.

But just to confirm, I set the batch size to 1, and it still ran into an out of 
memory error. So I feel fairly confident saying that the Gatherer is trying to 
grab all available data before sending any of it downstream.

On Mon, Nov 11, 2024, 8:46 AM David Alayachew 
mailto:davidalayac...@gmail.com>> wrote:
Hello Core Libs Dev Team,

I was trying out Gatherers for a project at work, and ran into a rather sad 
scenario.

I need to process a large file in batches. Each batch is small enough that I 
can hold it in memory, but I cannot hold the entire file (and thus, all of the 
batches) in memory at once.

Looking at the Gatherers API, I saw windowFixed and thought that it would be a 
great match for my use case.

Ho

Re: [External] : Re: Question about Streams, Gatherers, and fetching too many elements

2024-11-11 Thread David Alayachew
Good to know, ty vm.

At the very least, I have this workaround. This will meet my needs for now.

I guess my final question would be -- is this type of problem better suited
to something besides parallel streams? Maybe an ExecutorService?

Really, all I am doing is taking a jumbo file, splitting it into batches,
and then doing some work on those batches. My IO speeds are pretty fast,
and the compute work is non-trivial, so there is performance being left on
the table if I give up parallelism. And I am in a position where completion
time is very important to us.

I just naturally assumed parallel streams were the right choice because the
compute work is simple. A pure function that I can break out, and then call
in a map. Once I do that, I just call forEach to write the batches back out
to S3. Maybe I should look into a different part of the std lib instead
because I am using the wrong tool for the job? My nose says
ExecutorService, but I figure I should ask before I dive too deep in.


On Mon, Nov 11, 2024, 2:34 PM Viktor Klang  wrote:

> You're most welcome!
>
> In a potential future where all intermediate operations are
> Gatherer-based, and all terminal operations are Collector-based, it would
> just work as expected. But with that said, I'm not sure it is practically
> achievable because some operations might not have the same
> performance-characteristics as before.
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> --
> *From:* David Alayachew 
> *Sent:* Monday, 11 November 2024 18:32
> *To:* Viktor Klang 
> *Cc:* core-libs-dev 
> *Subject:* [External] : Re: Question about Streams, Gatherers, and
> fetching too many elements
>
>
> Thanks for the workaround. It's running beautifully.
>
> Is there a future where this island concept is extended to the rest of
> streams? Tbh, I don't fully understand it.
>
> On Mon, Nov 11, 2024, 9:59 AM Viktor Klang 
> wrote:
>
> Hi David,
>
> This is the effect of how parallel streams are implemented, where
> different stages, which are not representible as a join-less Spliterator
> are executed as a series of "islands" where the next isn't started until
> the former has completed.
>
> If you think about it, parallelization of a Stream works best when the
> entire data set can be split amongst a set of worker threads, and that sort
> of implies that you want eager pre-fetch of data, so if your dataset does
> not fit in memory, that is likely to lead to less desirable outcomes.
>
> What I was able to do for Gatherers is to implement "gather(…) +
> collect(…)"-fusion so any number of consecutive gather(…)-operations
> immediately followed by a collect(…) is run in the same "island".
>
> So with that said, you could try something like the following:
>
> static  Collector *forEach*(Consumer *each*) {
> *return* Collector.of(() -> null, (*v*, *e*) -> each.accept(e), (*l*,
> *r*) -> l, (*v*) -> null, Collector.Characteristics.IDENTITY_FINISH);
> }
>
>
> stream
> .parallel()
> .unordered()
> .gather(Gatherers.windowFixed(BATCH_SIZE))
> .collect(forEach(eachList -> println(eachList.getFirst(;
>
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> --
> *From:* core-libs-dev  on behalf of David
> Alayachew 
> *Sent:* Monday, 11 November 2024 14:52
> *To:* core-libs-dev 
> *Subject:* Re: Question about Streams, Gatherers, and fetching too many
> elements
>
> And just to avoid the obvious question, I can hold about 30 batches in
> memory before the Out of Memory error occurs. So this is not an issue of my
> batch size being too high.
>
> But just to confirm, I set the batch size to 1, and it still ran into an
> out of memory error. So I feel fairly confident saying that the Gatherer is
> trying to grab all available data before sending any of it downstream.
>
> On Mon, Nov 11, 2024, 8:46 AM David Alayachew 
> wrote:
>
> Hello Core Libs Dev Team,
>
> I was trying out Gatherers for a project at work, and ran into a rather
> sad scenario.
>
> I need to process a large file in batches. Each batch is small enough that
> I can hold it in memory, but I cannot hold the entire file (and thus, all
> of the batches) in memory at once.
>
> Looking at the Gatherers API, I saw windowFixed and thought that it would
> be a great match for my use case.
>
> However, when trying it out, I was disappointed to see that it ran out of
> memory very quickly. Here is my attempt at using it.
>
> stream
> .parallel()
> .unordered()
> .gather(Gatherers.windowFixed(BATCH_SIZE))
> .forEach(eachList -> println(eachList.getFirst()))
> ;
>
> As you can see, I am just splitting the file into batches, and printing
> out the first of each batch. This is purely for example's sake, of course.
> I had planned on building even more functionality on top of this, but I
> couldn't even get past this example.
>
> But anyways, not even a single one of them printed out. Which leads me to

Re: [External] : Re: Question about Streams, Gatherers, and fetching too many elements

2024-11-11 Thread Viktor Klang
You're most welcome!

In a potential future where all intermediate operations are Gatherer-based, and 
all terminal operations are Collector-based, it would just work as expected. 
But with that said, I'm not sure it is practically achievable because some 
operations might not have the same performance-characteristics as before.

Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle

From: David Alayachew 
Sent: Monday, 11 November 2024 18:32
To: Viktor Klang 
Cc: core-libs-dev 
Subject: [External] : Re: Question about Streams, Gatherers, and fetching too 
many elements


Thanks for the workaround. It's running beautifully.

Is there a future where this island concept is extended to the rest of streams? 
Tbh, I don't fully understand it.

On Mon, Nov 11, 2024, 9:59 AM Viktor Klang 
mailto:viktor.kl...@oracle.com>> wrote:
Hi David,

This is the effect of how parallel streams are implemented, where different 
stages, which are not representible as a join-less Spliterator are executed as 
a series of "islands" where the next isn't started until the former has 
completed.

If you think about it, parallelization of a Stream works best when the entire 
data set can be split amongst a set of worker threads, and that sort of implies 
that you want eager pre-fetch of data, so if your dataset does not fit in 
memory, that is likely to lead to less desirable outcomes.

What I was able to do for Gatherers is to implement "gather(…) + 
collect(…)"-fusion so any number of consecutive gather(…)-operations 
immediately followed by a collect(…) is run in the same "island".

So with that said, you could try something like the following:

static  Collector forEach(Consumer each) {
return Collector.of(() -> null, (v, e) -> each.accept(e), (l, r) -> l, (v) 
-> null, Collector.Characteristics.IDENTITY_FINISH);
}


stream
.parallel()
.unordered()
.gather(Gatherers.windowFixed(BATCH_SIZE))
.collect(forEach(eachList -> println(eachList.getFirst(;


Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle

From: core-libs-dev 
mailto:core-libs-dev-r...@openjdk.org>> on 
behalf of David Alayachew 
mailto:davidalayac...@gmail.com>>
Sent: Monday, 11 November 2024 14:52
To: core-libs-dev mailto:core-libs-dev@openjdk.org>>
Subject: Re: Question about Streams, Gatherers, and fetching too many elements

And just to avoid the obvious question, I can hold about 30 batches in memory 
before the Out of Memory error occurs. So this is not an issue of my batch size 
being too high.

But just to confirm, I set the batch size to 1, and it still ran into an out of 
memory error. So I feel fairly confident saying that the Gatherer is trying to 
grab all available data before sending any of it downstream.

On Mon, Nov 11, 2024, 8:46 AM David Alayachew 
mailto:davidalayac...@gmail.com>> wrote:
Hello Core Libs Dev Team,

I was trying out Gatherers for a project at work, and ran into a rather sad 
scenario.

I need to process a large file in batches. Each batch is small enough that I 
can hold it in memory, but I cannot hold the entire file (and thus, all of the 
batches) in memory at once.

Looking at the Gatherers API, I saw windowFixed and thought that it would be a 
great match for my use case.

However, when trying it out, I was disappointed to see that it ran out of 
memory very quickly. Here is my attempt at using it.

stream
.parallel()
.unordered()
.gather(Gatherers.windowFixed(BATCH_SIZE))
.forEach(eachList -> println(eachList.getFirst()))
;

As you can see, I am just splitting the file into batches, and printing out the 
first of each batch. This is purely for example's sake, of course. I had 
planned on building even more functionality on top of this, but I couldn't even 
get past this example.

But anyways, not even a single one of them printed out. Which leads me to 
believe that it's pulling all of them in the Gatherer.

I can get it to run successfully if I go sequentially, but not parallel. 
Parallel gives me that out of memory error.

Is there any way for me to be able to have the Gatherer NOT pull in everything 
while still remaining parallel and unordered?

Thank you for your time and help.
David Alayachew