Re: @RequiresStableInput and Pipeline fusion

2022-12-14 Thread Jan Lukavský

Filled https://github.com/apache/beam/issues/24655.

 Jan

On 12/14/22 00:52, Luke Cwik via dev wrote:
This is definitely not working for portable pipelines since the 
GreedyPipelineFuser doesn't create a fusion boundary which as you 
pointed out causes a single stage that has a non-deterministic 
function followed by one that requires stable input. It seems as 
though we should have runners check the requirements on the 
Pipeline[1] to ensure that they can faithfully process such a pipeline 
and reject anything they don't support early on.


Making the GreedyPipelineFuser insert that fusion break is likely the 
way to go. Runners should be able to look at the ParDoPayload 
requires_stable_input field for the ExecutableStage to see if any 
special handling is necessary on their end before they pass data to 
that stage.


[1]: 
https://github.com/apache/beam/blob/77af3237521d94f0399ab405ebac09bbbeded38c/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L111 



On Tue, Dec 13, 2022 at 1:44 AM Jan Lukavský  wrote:

Hi,

I have a question about @RequiresStableInput functionality. I'm
trying to make it work for portable Flink runner [1], [2]. We have
an integration test (which should probably be turned into
Validates runner test, but that is a different story) [3]. The
test creates random key for input element, processes it once,
fails the pipeline and then reprocesses it. This works well
provided there is a checkpoint (shuffle in case of dataflow)
exactly between assigning random key (via PairWithRandomKeyFn) and
processing it with (via MakeSideEffectAndThenFailFn), this works well.

The problem is that GreedyPipelineFuser fuses the transform
PairWithRandomKeyFn andMakeSideEffectAndThenFailFn into single
ExecutableStage. This is then executed with the
@RequiresStableInput requirement, but this obviously assigns a
different key to the reprocessed element(s). This looks like we
need to fix that in the PipelineFuser, is this right? Does this
mean the @RequiresStableInput functionality is actually broken for
all runners that use the default fusion?

Another possibility is that we need to fix test by adding an
explicit reshuffle (verified, this works), but I think that the
test is actually correct, users would probably not expect
transforms to be fused when crossing the @RequiresStableInput
boundary.

Thoughts?

 Jan


[1] https://github.com/apache/beam/issues/20812
[2] https://github.com/apache/beam/pull/22889
[3]

https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/RequiresStableInputIT.java


Re: @RequiresStableInput and Pipeline fusion

2022-12-13 Thread Luke Cwik via dev
This is definitely not working for portable pipelines since the
GreedyPipelineFuser doesn't create a fusion boundary which as you pointed
out causes a single stage that has a non-deterministic function followed by
one that requires stable input. It seems as though we should have runners
check the requirements on the Pipeline[1] to ensure that they can
faithfully process such a pipeline and reject anything they don't support
early on.

Making the GreedyPipelineFuser insert that fusion break is likely the way
to go. Runners should be able to look at the ParDoPayload
requires_stable_input field for the ExecutableStage to see if any special
handling is necessary on their end before they pass data to that stage.

[1]:
https://github.com/apache/beam/blob/77af3237521d94f0399ab405ebac09bbbeded38c/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L111


On Tue, Dec 13, 2022 at 1:44 AM Jan Lukavský  wrote:

> Hi,
>
> I have a question about @RequiresStableInput functionality. I'm trying to
> make it work for portable Flink runner [1], [2]. We have an integration
> test (which should probably be turned into Validates runner test, but that
> is a different story) [3]. The test creates random key for input element,
> processes it once, fails the pipeline and then reprocesses it. This works
> well provided there is a checkpoint (shuffle in case of dataflow) exactly
> between assigning random key (via PairWithRandomKeyFn) and processing it
> with (via MakeSideEffectAndThenFailFn), this works well.
>
> The problem is that GreedyPipelineFuser fuses the transform 
> PairWithRandomKeyFn
> and MakeSideEffectAndThenFailFn into single ExecutableStage. This is then
> executed with the @RequiresStableInput requirement, but this obviously
> assigns a different key to the reprocessed element(s). This looks like we
> need to fix that in the PipelineFuser, is this right? Does this mean the
> @RequiresStableInput functionality is actually broken for all runners that
> use the default fusion?
>
> Another possibility is that we need to fix test by adding an explicit
> reshuffle (verified, this works), but I think that the test is actually
> correct, users would probably not expect transforms to be fused when
> crossing the @RequiresStableInput boundary.
>
> Thoughts?
>
>  Jan
>
>
> [1] https://github.com/apache/beam/issues/20812
> [2] https://github.com/apache/beam/pull/22889
> [3]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/RequiresStableInputIT.java
>


@RequiresStableInput and Pipeline fusion

2022-12-13 Thread Jan Lukavský

Hi,

I have a question about @RequiresStableInput functionality. I'm trying 
to make it work for portable Flink runner [1], [2]. We have an 
integration test (which should probably be turned into Validates runner 
test, but that is a different story) [3]. The test creates random key 
for input element, processes it once, fails the pipeline and then 
reprocesses it. This works well provided there is a checkpoint (shuffle 
in case of dataflow) exactly between assigning random key (via 
PairWithRandomKeyFn) and processing it with (via 
MakeSideEffectAndThenFailFn), this works well.


The problem is that GreedyPipelineFuser fuses the transform 
PairWithRandomKeyFn andMakeSideEffectAndThenFailFn into single 
ExecutableStage. This is then executed with the @RequiresStableInput 
requirement, but this obviously assigns a different key to the 
reprocessed element(s). This looks like we need to fix that in the 
PipelineFuser, is this right? Does this mean the @RequiresStableInput 
functionality is actually broken for all runners that use the default 
fusion?


Another possibility is that we need to fix test by adding an explicit 
reshuffle (verified, this works), but I think that the test is actually 
correct, users would probably not expect transforms to be fused when 
crossing the @RequiresStableInput boundary.


Thoughts?

 Jan


[1] https://github.com/apache/beam/issues/20812
[2] https://github.com/apache/beam/pull/22889
[3] 
https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/RequiresStableInputIT.java