Re: [DESIGN PROPOSAL] Reshuffle Allowing Duplicates

2024-03-13 Thread Kenneth Knowles
Closing the loop, I went with two URNs and an associated payload in
https://github.com/apache/beam/pull/30545

Kenn

On Wed, Mar 6, 2024 at 10:54 AM Kenneth Knowles  wrote:

> OK of course hacking this up there's already combinatorial 2x2 that
> perhaps people were alluding to but I missed.
>
> RedistributeByKey (user's choice)
> RedistributeArbitrarily (runner's choice! default may be random keys but
> that is not required)
>
> RedistributeArbitrarilyAllowingDuplicates (this is the use case I am
> trying to get at with the design & impl - basically runner's choice and
> also no need to dedup or persist)
> RedistributeByKeyAllowingDuplicates (is this an important use case? I
> don't know - if so, then it points to some future where you tag any
> transform with this)
>
> So now I kind of want to have two URNs (one per input/output type) and a
> config that allows duplicates.
>
> WDYT? Do the people who liked having separate URNs want to have 4 URNs? We
> can still have whatever end-user SDK interface we need to have regardless.
> I think in Java we want it to look like this regardless:
>
> Redistribute.arbitrarily()
> Redistribute.byKey()
> Redistribute.arbitrarily().allowingDuplicates()
> Redistribute.byKey().allowingDuplicates()
>
> And Python
>
> beam.Redistribute()
> beam.RedistributeByKey()
> beam.Redistribute(allowDuplicates=true)
> beam.RedistributeByKey(allowDuplicates=true)
>
> I'll add end-user APIs to the design doc (and ask for help on Python and
> Go idioms) but they are pretty short and sweet.
>
> Kenn
>
> On Thu, Feb 8, 2024 at 1:45 PM Robert Burke  wrote:
>
>> Was that only October? Wow.
>>
>> Option 2 SGTM, with the adjustment to making the core of the URN
>> "redistribute_allowing_duplicates" instead of building from the unspecified
>> Reshuffle semantics.
>>
>> Transforms getting updated to use the new transform can have their
>> @RequiresStableInputs annotation added  accordingly if they need that
>> property per previous discussions.
>>
>>
>>
>> On Thu, Feb 8, 2024, 10:31 AM Kenneth Knowles  wrote:
>>
>>>
>>>
>>> On Wed, Feb 7, 2024 at 5:15 PM Robert Burke  wrote:
>>>
 OK, so my stance is a configurable Reshuffle might be interesting, so
 my vote is +1, along the following lines.

 1. Use a new URN (beam:transform:reshuffle:v2) and attach a new
 ReshufflePayload to it.

>>>
>>> Ah, I see there's more than one variation of the "new URN" approach.
>>> Namely, you have a new version of an existing URN prefix, while I had in
>>> mind that it was a totally new base URN. In other words the open question I
>>> meant to pose is between these options:
>>>
>>> 1. beam:transform:reshuffle:v2 + { allowing_duplicates: true }
>>> 2. beam:transform:reshuffle_allowing_duplicates:v1 {}
>>>
>>> The most compelling argument in favor of option 2 is that it could have
>>> a distinct payload type associated with the different URN (maybe parameters
>>> around tweaking how much duplication? I don't know... I actually expect
>>> neither payload to evolve much if at all).
>>>
>>> There were also two comments in favor of option 2 on the design doc.
>>>
>>>   -> Unknown "urns for composite transforms" already default to the
 subtransform graph implementation for most (all?) runners.
   -> Having a payload to toggle this behavior then can have whatever
 desired behavior we like. It also allows for additional configurations
 added in later on. This is preferable to a plethora of one-off urns IMHO.
 We can have SDKs gate configuration combinations as needed if additional
 ones appear.

 2. It's very cheap to add but also ignore, as the default is "Do what
 we're already doing without change", and not all SDKs need to add it right
 away. It's more important that the portable way is defined at least, so
 it's easy for other SDKs to add and handle it.

 I would prefer we have a clear starting point on what Reshuffle does
 though. I remain a fan of "The Reshuffle (v2) Transform is a user
 designated hint to a runner for a change in parallelism. By default, it
 produces an output PCollection that has the same elements as the input
 PCollection".

>>>
>>> +1 this is a better phrasing of the spec I propose in
>>> https://s.apache.org/beam-redistribute but let's not get into it here
>>> if we can, and just evaluate the delta from that design to
>>> https://s.apache.org/beam-reshuffle-allowing-duplicates
>>>
>>> Kenn
>>>
>>>
 It remains an open question about what that means for
 checkpointing/durability behavior, but that's largely been runner dependent
 anyway. I admit the above definition is biased by the uses of Reshuffle I'm
 aware of, which largely are to incur a fusion break in the execution graph.

 Robert Burke
 Beam Go Busybody

 On 2024/01/31 16:01:33 Kenneth Knowles wrote:
 > On Wed, Jan 31, 2024 at 4:21 AM Jan Lukavský  wrote:
 >
 > > Hi,
 > >
 > > if I 

Re: [DESIGN PROPOSAL] Reshuffle Allowing Duplicates

2024-03-06 Thread Kenneth Knowles
OK of course hacking this up there's already combinatorial 2x2 that perhaps
people were alluding to but I missed.

RedistributeByKey (user's choice)
RedistributeArbitrarily (runner's choice! default may be random keys but
that is not required)

RedistributeArbitrarilyAllowingDuplicates (this is the use case I am trying
to get at with the design & impl - basically runner's choice and also no
need to dedup or persist)
RedistributeByKeyAllowingDuplicates (is this an important use case? I don't
know - if so, then it points to some future where you tag any transform
with this)

So now I kind of want to have two URNs (one per input/output type) and a
config that allows duplicates.

WDYT? Do the people who liked having separate URNs want to have 4 URNs? We
can still have whatever end-user SDK interface we need to have regardless.
I think in Java we want it to look like this regardless:

Redistribute.arbitrarily()
Redistribute.byKey()
Redistribute.arbitrarily().allowingDuplicates()
Redistribute.byKey().allowingDuplicates()

And Python

beam.Redistribute()
beam.RedistributeByKey()
beam.Redistribute(allowDuplicates=true)
beam.RedistributeByKey(allowDuplicates=true)

I'll add end-user APIs to the design doc (and ask for help on Python and Go
idioms) but they are pretty short and sweet.

Kenn

On Thu, Feb 8, 2024 at 1:45 PM Robert Burke  wrote:

> Was that only October? Wow.
>
> Option 2 SGTM, with the adjustment to making the core of the URN
> "redistribute_allowing_duplicates" instead of building from the unspecified
> Reshuffle semantics.
>
> Transforms getting updated to use the new transform can have their
> @RequiresStableInputs annotation added  accordingly if they need that
> property per previous discussions.
>
>
>
> On Thu, Feb 8, 2024, 10:31 AM Kenneth Knowles  wrote:
>
>>
>>
>> On Wed, Feb 7, 2024 at 5:15 PM Robert Burke  wrote:
>>
>>> OK, so my stance is a configurable Reshuffle might be interesting, so my
>>> vote is +1, along the following lines.
>>>
>>> 1. Use a new URN (beam:transform:reshuffle:v2) and attach a new
>>> ReshufflePayload to it.
>>>
>>
>> Ah, I see there's more than one variation of the "new URN" approach.
>> Namely, you have a new version of an existing URN prefix, while I had in
>> mind that it was a totally new base URN. In other words the open question I
>> meant to pose is between these options:
>>
>> 1. beam:transform:reshuffle:v2 + { allowing_duplicates: true }
>> 2. beam:transform:reshuffle_allowing_duplicates:v1 {}
>>
>> The most compelling argument in favor of option 2 is that it could have a
>> distinct payload type associated with the different URN (maybe parameters
>> around tweaking how much duplication? I don't know... I actually expect
>> neither payload to evolve much if at all).
>>
>> There were also two comments in favor of option 2 on the design doc.
>>
>>   -> Unknown "urns for composite transforms" already default to the
>>> subtransform graph implementation for most (all?) runners.
>>>   -> Having a payload to toggle this behavior then can have whatever
>>> desired behavior we like. It also allows for additional configurations
>>> added in later on. This is preferable to a plethora of one-off urns IMHO.
>>> We can have SDKs gate configuration combinations as needed if additional
>>> ones appear.
>>>
>>> 2. It's very cheap to add but also ignore, as the default is "Do what
>>> we're already doing without change", and not all SDKs need to add it right
>>> away. It's more important that the portable way is defined at least, so
>>> it's easy for other SDKs to add and handle it.
>>>
>>> I would prefer we have a clear starting point on what Reshuffle does
>>> though. I remain a fan of "The Reshuffle (v2) Transform is a user
>>> designated hint to a runner for a change in parallelism. By default, it
>>> produces an output PCollection that has the same elements as the input
>>> PCollection".
>>>
>>
>> +1 this is a better phrasing of the spec I propose in
>> https://s.apache.org/beam-redistribute but let's not get into it here if
>> we can, and just evaluate the delta from that design to
>> https://s.apache.org/beam-reshuffle-allowing-duplicates
>>
>> Kenn
>>
>>
>>> It remains an open question about what that means for
>>> checkpointing/durability behavior, but that's largely been runner dependent
>>> anyway. I admit the above definition is biased by the uses of Reshuffle I'm
>>> aware of, which largely are to incur a fusion break in the execution graph.
>>>
>>> Robert Burke
>>> Beam Go Busybody
>>>
>>> On 2024/01/31 16:01:33 Kenneth Knowles wrote:
>>> > On Wed, Jan 31, 2024 at 4:21 AM Jan Lukavský  wrote:
>>> >
>>> > > Hi,
>>> > >
>>> > > if I understand this proposal correctly, the motivation is actually
>>> > > reducing latency by bypassing bundle atomic guarantees, bundles
>>> after "at
>>> > > least once" Reshuffle would be reconstructed independently of the
>>> > > pre-shuffle bundling. Provided this is correct, it seems that the
>>> behavior
>>> > > is slightly more 

Re: [DESIGN PROPOSAL] Reshuffle Allowing Duplicates

2024-02-08 Thread Robert Burke
Was that only October? Wow.

Option 2 SGTM, with the adjustment to making the core of the URN
"redistribute_allowing_duplicates" instead of building from the unspecified
Reshuffle semantics.

Transforms getting updated to use the new transform can have their
@RequiresStableInputs annotation added  accordingly if they need that
property per previous discussions.



On Thu, Feb 8, 2024, 10:31 AM Kenneth Knowles  wrote:

>
>
> On Wed, Feb 7, 2024 at 5:15 PM Robert Burke  wrote:
>
>> OK, so my stance is a configurable Reshuffle might be interesting, so my
>> vote is +1, along the following lines.
>>
>> 1. Use a new URN (beam:transform:reshuffle:v2) and attach a new
>> ReshufflePayload to it.
>>
>
> Ah, I see there's more than one variation of the "new URN" approach.
> Namely, you have a new version of an existing URN prefix, while I had in
> mind that it was a totally new base URN. In other words the open question I
> meant to pose is between these options:
>
> 1. beam:transform:reshuffle:v2 + { allowing_duplicates: true }
> 2. beam:transform:reshuffle_allowing_duplicates:v1 {}
>
> The most compelling argument in favor of option 2 is that it could have a
> distinct payload type associated with the different URN (maybe parameters
> around tweaking how much duplication? I don't know... I actually expect
> neither payload to evolve much if at all).
>
> There were also two comments in favor of option 2 on the design doc.
>
>   -> Unknown "urns for composite transforms" already default to the
>> subtransform graph implementation for most (all?) runners.
>>   -> Having a payload to toggle this behavior then can have whatever
>> desired behavior we like. It also allows for additional configurations
>> added in later on. This is preferable to a plethora of one-off urns IMHO.
>> We can have SDKs gate configuration combinations as needed if additional
>> ones appear.
>>
>> 2. It's very cheap to add but also ignore, as the default is "Do what
>> we're already doing without change", and not all SDKs need to add it right
>> away. It's more important that the portable way is defined at least, so
>> it's easy for other SDKs to add and handle it.
>>
>> I would prefer we have a clear starting point on what Reshuffle does
>> though. I remain a fan of "The Reshuffle (v2) Transform is a user
>> designated hint to a runner for a change in parallelism. By default, it
>> produces an output PCollection that has the same elements as the input
>> PCollection".
>>
>
> +1 this is a better phrasing of the spec I propose in
> https://s.apache.org/beam-redistribute but let's not get into it here if
> we can, and just evaluate the delta from that design to
> https://s.apache.org/beam-reshuffle-allowing-duplicates
>
> Kenn
>
>
>> It remains an open question about what that means for
>> checkpointing/durability behavior, but that's largely been runner dependent
>> anyway. I admit the above definition is biased by the uses of Reshuffle I'm
>> aware of, which largely are to incur a fusion break in the execution graph.
>>
>> Robert Burke
>> Beam Go Busybody
>>
>> On 2024/01/31 16:01:33 Kenneth Knowles wrote:
>> > On Wed, Jan 31, 2024 at 4:21 AM Jan Lukavský  wrote:
>> >
>> > > Hi,
>> > >
>> > > if I understand this proposal correctly, the motivation is actually
>> > > reducing latency by bypassing bundle atomic guarantees, bundles after
>> "at
>> > > least once" Reshuffle would be reconstructed independently of the
>> > > pre-shuffle bundling. Provided this is correct, it seems that the
>> behavior
>> > > is slightly more general than for the case of Reshuffle. We have
>> already
>> > > some transforms that manipulate a specific property of a PCollection
>> - if
>> > > it may or might not contain duplicates. That is manipulated in two
>> ways -
>> > > explicitly removing duplicates based on IDs on sources that generate
>> > > duplicates and using @RequiresStableInput, mostly in sinks. These
>> > > techniques modify an inherent property of a PCollection, that is if it
>> > > contains or does not contain possible duplicates originating from the
>> same
>> > > input element.
>> > >
>> > > There are two types of duplicates - duplicate elements in _different
>> > > bundles_ (typically from at-least-once sources) and duplicates
>> arising due
>> > > to bundle reprocessing (affecting only transforms with side-effects,
>> that
>> > > is what we solve by @RequiresStableInput). The point I'm trying to
>> get to -
>> > > should we add these properties to PCollections (contains cross-bundle
>> > > duplicates vs. does not) and PTransforms ("outputs deduplicated
>> elements"
>> > > and "requires stable input")? That would allow us to analyze the
>> Pipeline
>> > > DAG and provide appropriate implementation for Reshuffle
>> automatically, so
>> > > that a new URN or flag would not be needed. Moreover, this might be
>> useful
>> > > for a broader range of optimizations.
>> > >
>> > > WDYT?
>> > >
>> > These are interesting ideas that could be useful. I think 

Re: [DESIGN PROPOSAL] Reshuffle Allowing Duplicates

2024-02-08 Thread Kenneth Knowles
On Wed, Feb 7, 2024 at 5:15 PM Robert Burke  wrote:

> OK, so my stance is a configurable Reshuffle might be interesting, so my
> vote is +1, along the following lines.
>
> 1. Use a new URN (beam:transform:reshuffle:v2) and attach a new
> ReshufflePayload to it.
>

Ah, I see there's more than one variation of the "new URN" approach.
Namely, you have a new version of an existing URN prefix, while I had in
mind that it was a totally new base URN. In other words the open question I
meant to pose is between these options:

1. beam:transform:reshuffle:v2 + { allowing_duplicates: true }
2. beam:transform:reshuffle_allowing_duplicates:v1 {}

The most compelling argument in favor of option 2 is that it could have a
distinct payload type associated with the different URN (maybe parameters
around tweaking how much duplication? I don't know... I actually expect
neither payload to evolve much if at all).

There were also two comments in favor of option 2 on the design doc.

  -> Unknown "urns for composite transforms" already default to the
> subtransform graph implementation for most (all?) runners.
>   -> Having a payload to toggle this behavior then can have whatever
> desired behavior we like. It also allows for additional configurations
> added in later on. This is preferable to a plethora of one-off urns IMHO.
> We can have SDKs gate configuration combinations as needed if additional
> ones appear.
>
> 2. It's very cheap to add but also ignore, as the default is "Do what
> we're already doing without change", and not all SDKs need to add it right
> away. It's more important that the portable way is defined at least, so
> it's easy for other SDKs to add and handle it.
>
> I would prefer we have a clear starting point on what Reshuffle does
> though. I remain a fan of "The Reshuffle (v2) Transform is a user
> designated hint to a runner for a change in parallelism. By default, it
> produces an output PCollection that has the same elements as the input
> PCollection".
>

+1 this is a better phrasing of the spec I propose in
https://s.apache.org/beam-redistribute but let's not get into it here if we
can, and just evaluate the delta from that design to
https://s.apache.org/beam-reshuffle-allowing-duplicates

Kenn


> It remains an open question about what that means for
> checkpointing/durability behavior, but that's largely been runner dependent
> anyway. I admit the above definition is biased by the uses of Reshuffle I'm
> aware of, which largely are to incur a fusion break in the execution graph.
>
> Robert Burke
> Beam Go Busybody
>
> On 2024/01/31 16:01:33 Kenneth Knowles wrote:
> > On Wed, Jan 31, 2024 at 4:21 AM Jan Lukavský  wrote:
> >
> > > Hi,
> > >
> > > if I understand this proposal correctly, the motivation is actually
> > > reducing latency by bypassing bundle atomic guarantees, bundles after
> "at
> > > least once" Reshuffle would be reconstructed independently of the
> > > pre-shuffle bundling. Provided this is correct, it seems that the
> behavior
> > > is slightly more general than for the case of Reshuffle. We have
> already
> > > some transforms that manipulate a specific property of a PCollection -
> if
> > > it may or might not contain duplicates. That is manipulated in two
> ways -
> > > explicitly removing duplicates based on IDs on sources that generate
> > > duplicates and using @RequiresStableInput, mostly in sinks. These
> > > techniques modify an inherent property of a PCollection, that is if it
> > > contains or does not contain possible duplicates originating from the
> same
> > > input element.
> > >
> > > There are two types of duplicates - duplicate elements in _different
> > > bundles_ (typically from at-least-once sources) and duplicates arising
> due
> > > to bundle reprocessing (affecting only transforms with side-effects,
> that
> > > is what we solve by @RequiresStableInput). The point I'm trying to get
> to -
> > > should we add these properties to PCollections (contains cross-bundle
> > > duplicates vs. does not) and PTransforms ("outputs deduplicated
> elements"
> > > and "requires stable input")? That would allow us to analyze the
> Pipeline
> > > DAG and provide appropriate implementation for Reshuffle
> automatically, so
> > > that a new URN or flag would not be needed. Moreover, this might be
> useful
> > > for a broader range of optimizations.
> > >
> > > WDYT?
> > >
> > These are interesting ideas that could be useful. I think they achieve a
> > different goal in my case. I actually want to explicitly allow
> > Reshuffle.allowingDuplicates() to skip expensive parts of its
> > implementation that are used to prevent duplicates.
> >
> > The property that would make it possible to automate this in the case of
> > combiners, or at least validate that the pipeline still gives 100%
> accurate
> > answers, would be something like @InsensitiveToDuplicateElements which is
> > longer and less esoteric than @Idempotent. For situations where there is
> a
> > source or sink that 

Re: [DESIGN PROPOSAL] Reshuffle Allowing Duplicates

2024-02-07 Thread Robert Burke
OK, so my stance is a configurable Reshuffle might be interesting, so my vote 
is +1, along the following lines.

1. Use a new URN (beam:transform:reshuffle:v2) and attach a new 
ReshufflePayload to it.
  -> Unknown "urns for composite transforms" already default to the 
subtransform graph implementation for most (all?) runners.
  -> Having a payload to toggle this behavior then can have whatever desired 
behavior we like. It also allows for additional configurations added in later 
on. This is preferable to a plethora of one-off urns IMHO. We can have SDKs 
gate configuration combinations as needed if additional ones appear.
 
2. It's very cheap to add but also ignore, as the default is "Do what we're 
already doing without change", and not all SDKs need to add it right away. It's 
more important that the portable way is defined at least, so it's easy for 
other SDKs to add and handle it.

I would prefer we have a clear starting point on what Reshuffle does though. I 
remain a fan of "The Reshuffle (v2) Transform is a user designated hint to a 
runner for a change in parallelism. By default, it produces an output 
PCollection that has the same elements as the input PCollection".

It remains an open question about what that means for checkpointing/durability 
behavior, but that's largely been runner dependent anyway. I admit the above 
definition is biased by the uses of Reshuffle I'm aware of, which largely are 
to incur a fusion break in the execution graph.

Robert Burke
Beam Go Busybody

On 2024/01/31 16:01:33 Kenneth Knowles wrote:
> On Wed, Jan 31, 2024 at 4:21 AM Jan Lukavský  wrote:
> 
> > Hi,
> >
> > if I understand this proposal correctly, the motivation is actually
> > reducing latency by bypassing bundle atomic guarantees, bundles after "at
> > least once" Reshuffle would be reconstructed independently of the
> > pre-shuffle bundling. Provided this is correct, it seems that the behavior
> > is slightly more general than for the case of Reshuffle. We have already
> > some transforms that manipulate a specific property of a PCollection - if
> > it may or might not contain duplicates. That is manipulated in two ways -
> > explicitly removing duplicates based on IDs on sources that generate
> > duplicates and using @RequiresStableInput, mostly in sinks. These
> > techniques modify an inherent property of a PCollection, that is if it
> > contains or does not contain possible duplicates originating from the same
> > input element.
> >
> > There are two types of duplicates - duplicate elements in _different
> > bundles_ (typically from at-least-once sources) and duplicates arising due
> > to bundle reprocessing (affecting only transforms with side-effects, that
> > is what we solve by @RequiresStableInput). The point I'm trying to get to -
> > should we add these properties to PCollections (contains cross-bundle
> > duplicates vs. does not) and PTransforms ("outputs deduplicated elements"
> > and "requires stable input")? That would allow us to analyze the Pipeline
> > DAG and provide appropriate implementation for Reshuffle automatically, so
> > that a new URN or flag would not be needed. Moreover, this might be useful
> > for a broader range of optimizations.
> >
> > WDYT?
> >
> These are interesting ideas that could be useful. I think they achieve a
> different goal in my case. I actually want to explicitly allow
> Reshuffle.allowingDuplicates() to skip expensive parts of its
> implementation that are used to prevent duplicates.
> 
> The property that would make it possible to automate this in the case of
> combiners, or at least validate that the pipeline still gives 100% accurate
> answers, would be something like @InsensitiveToDuplicateElements which is
> longer and less esoteric than @Idempotent. For situations where there is a
> source or sink that only has at-least-once guarantees then yea maybe the
> property "has duplicates" will let you know that you may as well use the
> duplicating reshuffle without any loss. But still, you may not want to
> introduce *more* duplicates.
> 
> I would say my proposal is a step in this direction that would gain some
> experience and tools that we might later use in a more automated way.
> 
> Kenn
> 
> >  Jan
> > On 1/30/24 23:22, Robert Burke wrote:
> >
> > Is the benefit of this proposal just the bounded deviation from the
> > existing reshuffle?
> >
> > Reshuffle is already rather dictated by arbitrary runner choice, from
> > simply ignoring the node, to forcing a materialization break, to a full
> > shuffle implementation which has additional side effects.
> >
> > But model wise I don't believe it guarantees specific checkpointing or
> > re-execution behavior as currently specified. The proto only says it
> > represents the operation (without specifying the behavior, that is a big
> > problem).
> >
> > I guess my concern here is that it implies/codifies that the existing
> > reshuffle has more behavior than it promises outside of the Java SDK.
> >
> > 

Re: [DESIGN PROPOSAL] Reshuffle Allowing Duplicates

2024-01-31 Thread Kenneth Knowles
On Wed, Jan 31, 2024 at 4:21 AM Jan Lukavský  wrote:

> Hi,
>
> if I understand this proposal correctly, the motivation is actually
> reducing latency by bypassing bundle atomic guarantees, bundles after "at
> least once" Reshuffle would be reconstructed independently of the
> pre-shuffle bundling. Provided this is correct, it seems that the behavior
> is slightly more general than for the case of Reshuffle. We have already
> some transforms that manipulate a specific property of a PCollection - if
> it may or might not contain duplicates. That is manipulated in two ways -
> explicitly removing duplicates based on IDs on sources that generate
> duplicates and using @RequiresStableInput, mostly in sinks. These
> techniques modify an inherent property of a PCollection, that is if it
> contains or does not contain possible duplicates originating from the same
> input element.
>
> There are two types of duplicates - duplicate elements in _different
> bundles_ (typically from at-least-once sources) and duplicates arising due
> to bundle reprocessing (affecting only transforms with side-effects, that
> is what we solve by @RequiresStableInput). The point I'm trying to get to -
> should we add these properties to PCollections (contains cross-bundle
> duplicates vs. does not) and PTransforms ("outputs deduplicated elements"
> and "requires stable input")? That would allow us to analyze the Pipeline
> DAG and provide appropriate implementation for Reshuffle automatically, so
> that a new URN or flag would not be needed. Moreover, this might be useful
> for a broader range of optimizations.
>
> WDYT?
>
These are interesting ideas that could be useful. I think they achieve a
different goal in my case. I actually want to explicitly allow
Reshuffle.allowingDuplicates() to skip expensive parts of its
implementation that are used to prevent duplicates.

The property that would make it possible to automate this in the case of
combiners, or at least validate that the pipeline still gives 100% accurate
answers, would be something like @InsensitiveToDuplicateElements which is
longer and less esoteric than @Idempotent. For situations where there is a
source or sink that only has at-least-once guarantees then yea maybe the
property "has duplicates" will let you know that you may as well use the
duplicating reshuffle without any loss. But still, you may not want to
introduce *more* duplicates.

I would say my proposal is a step in this direction that would gain some
experience and tools that we might later use in a more automated way.

Kenn

>  Jan
> On 1/30/24 23:22, Robert Burke wrote:
>
> Is the benefit of this proposal just the bounded deviation from the
> existing reshuffle?
>
> Reshuffle is already rather dictated by arbitrary runner choice, from
> simply ignoring the node, to forcing a materialization break, to a full
> shuffle implementation which has additional side effects.
>
> But model wise I don't believe it guarantees specific checkpointing or
> re-execution behavior as currently specified. The proto only says it
> represents the operation (without specifying the behavior, that is a big
> problem).
>
> I guess my concern here is that it implies/codifies that the existing
> reshuffle has more behavior than it promises outside of the Java SDK.
>
> "Allowing duplicates" WRT reshuffle is tricky. It feels like mostly allows
> an implementation that may mean the inputs into the reshuffle might be
> re-executed for example. But that's always under the runner's discretion ,
> and ultimately it could also prevent even getting the intended benefit of a
> reshuffle (notionally, just a fusion break).
>
> Is there even a valid way to implement the notion of a reshuffle that
> leads to duplicates outside of a retry/resilience case?
>
> ---
>
> To be clear, I'm not against the proposal. I'm against that its being
> built on a non-existent foundation. If the behavior isn't already defined,
> it's impossible to specify a real deviation from it.
>
> I'm all for more specific behaviors if means we actually clarify what the
> original version is in the protos, since its news to me ( just now, because
> I looked) that the Java reshuffle promises GBK-like side effects. But
> that's a long deprecated transform without a satisfying replacement for
> it's usage, so it may be moot.
>
> Robert Burke
>
>
>
> On Tue, Jan 30, 2024, 1:34 PM Kenneth Knowles  wrote:
>
>> Hi all,
>>
>> Just when you thought I had squeezed all the possible interest out of
>> this most boring-seeming of transforms :-)
>>
>> I wrote up a very quick proposal as a doc [1]. It is short enough that I
>> will also put the main idea and main question in this email so you can
>> quickly read. Best to put comments in the.
>>
>> Main idea: add a variation of Reshuffle that allows duplicates, aka "at
>> least once", so that users and runners can benefit from efficiency if it is
>> possible
>>
>> Main question: is it best as a parameter to existing reshuffle transforms
>> 

Re: [DESIGN PROPOSAL] Reshuffle Allowing Duplicates

2024-01-31 Thread Kenneth Knowles
On Tue, Jan 30, 2024 at 5:22 PM Robert Burke  wrote:

> Is the benefit of this proposal just the bounded deviation from the
> existing reshuffle?
>
> Reshuffle is already rather dictated by arbitrary runner choice, from
> simply ignoring the node, to forcing a materialization break, to a full
> shuffle implementation which has additional side effects.
>
> But model wise I don't believe it guarantees specific checkpointing or
> re-execution behavior as currently specified. The proto only says it
> represents the operation (without specifying the behavior, that is a big
> problem).
>

Indeed, the semantics are specified for reshuffle: the output PCollection
has the same elements as the input PCollection. Beam very deliberately
doesn't define operational characteristics. It is entirely possible that
reshuffle is meaningless for a runner, indeed. I'm not particularly trying
to re-open that can of worms here...

I guess my concern here is that it implies/codifies that the existing
> reshuffle has more behavior than it promises outside of the Java SDK.
>
> "Allowing duplicates" WRT reshuffle is tricky. It feels like mostly allows
> an implementation that may mean the inputs into the reshuffle might be
> re-executed for example. But that's always under the runner's discretion ,
> and ultimately it could also prevent even getting the intended benefit of a
> reshuffle (notionally, just a fusion break).
>

My intent is to be exactly as questionable as the current reshuffle, which
is indeed questionable. The semantics of the newly proposed transform is
that the output PCollection contains the same elements as the input
PCollection, possibly with duplicates. Aka the input is a subset of the
output.

Is there even a valid way to implement the notion of a reshuffle that leads
> to duplicates outside of a retry/resilience case?
>

Sure! ParDo(x -> { output(x); output(x) })

:-) :-) :-)

Kenn


>
> ---
>
> To be clear, I'm not against the proposal. I'm against that its being
> built on a non-existent foundation. If the behavior isn't already defined,
> it's impossible to specify a real deviation from it.
>
> I'm all for more specific behaviors if means we actually clarify what the
> original version is in the protos, since its news to me ( just now, because
> I looked) that the Java reshuffle promises GBK-like side effects. But
> that's a long deprecated transform without a satisfying replacement for
> it's usage, so it may be moot.
>
> Robert Burke
>
>
>
> On Tue, Jan 30, 2024, 1:34 PM Kenneth Knowles  wrote:
>
>> Hi all,
>>
>> Just when you thought I had squeezed all the possible interest out of
>> this most boring-seeming of transforms :-)
>>
>> I wrote up a very quick proposal as a doc [1]. It is short enough that I
>> will also put the main idea and main question in this email so you can
>> quickly read. Best to put comments in the.
>>
>> Main idea: add a variation of Reshuffle that allows duplicates, aka "at
>> least once", so that users and runners can benefit from efficiency if it is
>> possible
>>
>> Main question: is it best as a parameter to existing reshuffle transforms
>> or as new URN(s)? I have proposed it as a parameter but I think either one
>> could work.
>>
>> I would love feedback on the main idea, main question, or anywhere on the
>> doc.
>>
>> Thanks!
>>
>> Kenn
>>
>> [1] https://s.apache.org/beam-reshuffle-allowing-duplicates
>>
>


Re: [DESIGN PROPOSAL] Reshuffle Allowing Duplicates

2024-01-31 Thread Jan Lukavský

Hi,

if I understand this proposal correctly, the motivation is actually 
reducing latency by bypassing bundle atomic guarantees, bundles after 
"at least once" Reshuffle would be reconstructed independently of the 
pre-shuffle bundling. Provided this is correct, it seems that the 
behavior is slightly more general than for the case of Reshuffle. We 
have already some transforms that manipulate a specific property of a 
PCollection - if it may or might not contain duplicates. That is 
manipulated in two ways - explicitly removing duplicates based on IDs on 
sources that generate duplicates and using @RequiresStableInput, mostly 
in sinks. These techniques modify an inherent property of a PCollection, 
that is if it contains or does not contain possible duplicates 
originating from the same input element.


There are two types of duplicates - duplicate elements in _different 
bundles_ (typically from at-least-once sources) and duplicates arising 
due to bundle reprocessing (affecting only transforms with side-effects, 
that is what we solve by @RequiresStableInput). The point I'm trying to 
get to - should we add these properties to PCollections (contains 
cross-bundle duplicates vs. does not) and PTransforms ("outputs 
deduplicated elements" and "requires stable input")? That would allow us 
to analyze the Pipeline DAG and provide appropriate implementation for 
Reshuffle automatically, so that a new URN or flag would not be needed. 
Moreover, this might be useful for a broader range of optimizations.


WDYT?

 Jan

On 1/30/24 23:22, Robert Burke wrote:
Is the benefit of this proposal just the bounded deviation from the 
existing reshuffle?


Reshuffle is already rather dictated by arbitrary runner choice, from 
simply ignoring the node, to forcing a materialization break, to a 
full shuffle implementation which has additional side effects.


But model wise I don't believe it guarantees specific checkpointing or 
re-execution behavior as currently specified. The proto only says it 
represents the operation (without specifying the behavior, that is a 
big problem).


I guess my concern here is that it implies/codifies that the existing 
reshuffle has more behavior than it promises outside of the Java SDK.


"Allowing duplicates" WRT reshuffle is tricky. It feels like mostly 
allows an implementation that may mean the inputs into the reshuffle 
might be re-executed for example. But that's always under the runner's 
discretion , and ultimately it could also prevent even getting the 
intended benefit of a reshuffle (notionally, just a fusion break).


Is there even a valid way to implement the notion of a reshuffle that 
leads to duplicates outside of a retry/resilience case?


---

To be clear, I'm not against the proposal. I'm against that its being 
built on a non-existent foundation. If the behavior isn't already 
defined, it's impossible to specify a real deviation from it.


I'm all for more specific behaviors if means we actually clarify what 
the original version is in the protos, since its news to me ( just 
now, because I looked) that the Java reshuffle promises GBK-like side 
effects. But that's a long deprecated transform without a satisfying 
replacement for it's usage, so it may be moot.


Robert Burke



On Tue, Jan 30, 2024, 1:34 PM Kenneth Knowles  wrote:

Hi all,

Just when you thought I had squeezed all the possible interest out
of this most boring-seeming of transforms :-)

I wrote up a very quick proposal as a doc [1]. It is short enough
that I will also put the main idea and main question in this email
so you can quickly read. Best to put comments in the.

Main idea: add a variation of Reshuffle that allows duplicates,
aka "at least once", so that users and runners can benefit from
efficiency if it is possible

Main question: is it best as a parameter to existing reshuffle
transforms or as new URN(s)? I have proposed it as a parameter but
I think either one could work.

I would love feedback on the main idea, main question, or anywhere
on the doc.

Thanks!

Kenn

[1] https://s.apache.org/beam-reshuffle-allowing-duplicates


Re: [DESIGN PROPOSAL] Reshuffle Allowing Duplicates

2024-01-30 Thread Robert Burke
Is the benefit of this proposal just the bounded deviation from the
existing reshuffle?

Reshuffle is already rather dictated by arbitrary runner choice, from
simply ignoring the node, to forcing a materialization break, to a full
shuffle implementation which has additional side effects.

But model wise I don't believe it guarantees specific checkpointing or
re-execution behavior as currently specified. The proto only says it
represents the operation (without specifying the behavior, that is a big
problem).

I guess my concern here is that it implies/codifies that the existing
reshuffle has more behavior than it promises outside of the Java SDK.

"Allowing duplicates" WRT reshuffle is tricky. It feels like mostly allows
an implementation that may mean the inputs into the reshuffle might be
re-executed for example. But that's always under the runner's discretion ,
and ultimately it could also prevent even getting the intended benefit of a
reshuffle (notionally, just a fusion break).

Is there even a valid way to implement the notion of a reshuffle that leads
to duplicates outside of a retry/resilience case?

---

To be clear, I'm not against the proposal. I'm against that its being built
on a non-existent foundation. If the behavior isn't already defined, it's
impossible to specify a real deviation from it.

I'm all for more specific behaviors if means we actually clarify what the
original version is in the protos, since its news to me ( just now, because
I looked) that the Java reshuffle promises GBK-like side effects. But
that's a long deprecated transform without a satisfying replacement for
it's usage, so it may be moot.

Robert Burke



On Tue, Jan 30, 2024, 1:34 PM Kenneth Knowles  wrote:

> Hi all,
>
> Just when you thought I had squeezed all the possible interest out of this
> most boring-seeming of transforms :-)
>
> I wrote up a very quick proposal as a doc [1]. It is short enough that I
> will also put the main idea and main question in this email so you can
> quickly read. Best to put comments in the.
>
> Main idea: add a variation of Reshuffle that allows duplicates, aka "at
> least once", so that users and runners can benefit from efficiency if it is
> possible
>
> Main question: is it best as a parameter to existing reshuffle transforms
> or as new URN(s)? I have proposed it as a parameter but I think either one
> could work.
>
> I would love feedback on the main idea, main question, or anywhere on the
> doc.
>
> Thanks!
>
> Kenn
>
> [1] https://s.apache.org/beam-reshuffle-allowing-duplicates
>