Re: How do side inputs relate to stage fusion?

2023-12-15 Thread Chamikara Jayalath via dev
Created related feature request https://github.com/apache/beam/issues/29789

We have to put more thought into exactly how to come up with merged
environments that do not result in conflicts. I prefer trying to
automatically do this on the SDK side instead of pushing the complexity to
the user (for example, isolating dependencies within the same environment
using classloaders for Java).

Thanks,
Cham

On Fri, Dec 15, 2023 at 1:36 PM Joey Tran  wrote:

> Yeah, we already have `ResourceHint.get_merged_value(cls, outer_value,
> inner_value)` for reconciling resources within a composite, in the future
> we could possibly just have another similar method and have the environment
> merging logic hook into that.
>
> On Fri, Dec 15, 2023 at 3:53 PM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
>
>> There is definitely a body of future work in intelligently merging
>> compatible-but-not-equal environments. (Dataflow does this for example.)
>> Defining/detecting compatibility is not always easy, but sometimes is, and
>> we should at least cover those cases and grow them over time.
>>
>> On Fri, Dec 15, 2023 at 5:57 AM Joey Tran 
>> wrote:
>>
>>> Yeah I can confirm for the python runners (based on my reading of the
>>> translations.py [1]) that only identical environments are merged together.
>>>
>>> The funny thing is that we _originally_ implemented this hint as an
>>> annotation but then changed it to hint because it semantically felt more
>>> correct. I think we might go back to that since the environment merging
>>> logic isn't too flexible / easy to customize. Our type of hint is a bit
>>> unlike other hints anyways. Unlike resources like MinRam, these resources
>>> are additive (e.g. you can merge an environment that requires license A and
>>> an environment that requires license B into an environment that requires
>>> both A and B)
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/5fb4db31994d7c2c1e04d32a4b153bc83d739f36/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L4
>>>
>>> On Fri, Dec 15, 2023 at 8:43 AM Robert Burke  wrote:
>>>
 That would do it. We got so tunnel visioned on side inputs we missed
 that!

 IIRC the python local runner and Prism both only fuse transforms in
 identical environments together. So any environmental diffs will prevent
 fusion.

 Runners as a rule are usually free to ignore/manage hints as they like.
 Transform annotations might be an alternative, but how those are managed
 would be more SDK specific.

 On Fri, Dec 15, 2023, 5:21 AM Joey Tran 
 wrote:

> I figured out my issue. I thought side inputs were breaking up my
> pipeline but after experimenting with my transforms I now realize what was
> actually breaking it up was different transform environments that weren't
> considered compatible.
>
> We have a custom resource hint (for specifying whether a transform
> needs access to some software license) that we use with our transforms and
> that's what was preventing the fusion I was expecting. I'm I'm looking 
> into
> how to make these hints mergeable now.
>
> On Thu, Dec 14, 2023 at 7:46 PM Robert Burke 
> wrote:
>
>> Building on what Robert Bradshaw has said, basically, if these fusion
>> breaks don't exist, the pipeline can live lock, because the side input is
>> unable to finish computing for a given input element's window.
>>
>> I have recently added fusion to the Go Prism runner based on the
>> python side input semantics, and i was surprised that there are basically
>> two rules for fusion. The side input one, and for handling Stateful
>> processing.
>>
>>
>> This code here is the greedy fusion algorithm that Python uses, but a
>> less set based, so it might be easier to follow:
>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess.go#L513
>>
>> From the linked code comment:
>>
>> Side Inputs: A transform S consuming a PCollection as a side input
>> can't
>>  be fused with the transform P that produces that PCollection.
>> Further,
>> no transform S+ descended from S, can be fused with transform P.
>>
>> Ideally I'll add visual representations of the graphs in the test
>> suite here, that validates the side input dependency logic:
>>
>>
>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go#L398
>>
>> (Note, that test doesn't validate expected fusion results, Prism is a
>> work in progress).
>>
>>
>> As for the Stateful rule, this is largely an implementation
>> convenience for runners to ensure correct execution.
>> If your pipeline also uses Stateful transforms, or SplittableDoFns,
>> those are usually relegated to the root of a fused stage, and avoids
>> fusions with each other. 

Re: How do side inputs relate to stage fusion?

2023-12-15 Thread Joey Tran
Yeah, we already have `ResourceHint.get_merged_value(cls, outer_value,
inner_value)` for reconciling resources within a composite, in the future
we could possibly just have another similar method and have the environment
merging logic hook into that.

On Fri, Dec 15, 2023 at 3:53 PM Robert Bradshaw via dev 
wrote:

> There is definitely a body of future work in intelligently merging
> compatible-but-not-equal environments. (Dataflow does this for example.)
> Defining/detecting compatibility is not always easy, but sometimes is, and
> we should at least cover those cases and grow them over time.
>
> On Fri, Dec 15, 2023 at 5:57 AM Joey Tran 
> wrote:
>
>> Yeah I can confirm for the python runners (based on my reading of the
>> translations.py [1]) that only identical environments are merged together.
>>
>> The funny thing is that we _originally_ implemented this hint as an
>> annotation but then changed it to hint because it semantically felt more
>> correct. I think we might go back to that since the environment merging
>> logic isn't too flexible / easy to customize. Our type of hint is a bit
>> unlike other hints anyways. Unlike resources like MinRam, these resources
>> are additive (e.g. you can merge an environment that requires license A and
>> an environment that requires license B into an environment that requires
>> both A and B)
>>
>> [1]
>> https://github.com/apache/beam/blob/5fb4db31994d7c2c1e04d32a4b153bc83d739f36/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L4
>>
>> On Fri, Dec 15, 2023 at 8:43 AM Robert Burke  wrote:
>>
>>> That would do it. We got so tunnel visioned on side inputs we missed
>>> that!
>>>
>>> IIRC the python local runner and Prism both only fuse transforms in
>>> identical environments together. So any environmental diffs will prevent
>>> fusion.
>>>
>>> Runners as a rule are usually free to ignore/manage hints as they like.
>>> Transform annotations might be an alternative, but how those are managed
>>> would be more SDK specific.
>>>
>>> On Fri, Dec 15, 2023, 5:21 AM Joey Tran 
>>> wrote:
>>>
 I figured out my issue. I thought side inputs were breaking up my
 pipeline but after experimenting with my transforms I now realize what was
 actually breaking it up was different transform environments that weren't
 considered compatible.

 We have a custom resource hint (for specifying whether a transform
 needs access to some software license) that we use with our transforms and
 that's what was preventing the fusion I was expecting. I'm I'm looking into
 how to make these hints mergeable now.

 On Thu, Dec 14, 2023 at 7:46 PM Robert Burke 
 wrote:

> Building on what Robert Bradshaw has said, basically, if these fusion
> breaks don't exist, the pipeline can live lock, because the side input is
> unable to finish computing for a given input element's window.
>
> I have recently added fusion to the Go Prism runner based on the
> python side input semantics, and i was surprised that there are basically
> two rules for fusion. The side input one, and for handling Stateful
> processing.
>
>
> This code here is the greedy fusion algorithm that Python uses, but a
> less set based, so it might be easier to follow:
> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess.go#L513
>
> From the linked code comment:
>
> Side Inputs: A transform S consuming a PCollection as a side input
> can't
>  be fused with the transform P that produces that PCollection. Further,
> no transform S+ descended from S, can be fused with transform P.
>
> Ideally I'll add visual representations of the graphs in the test
> suite here, that validates the side input dependency logic:
>
>
> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go#L398
>
> (Note, that test doesn't validate expected fusion results, Prism is a
> work in progress).
>
>
> As for the Stateful rule, this is largely an implementation
> convenience for runners to ensure correct execution.
> If your pipeline also uses Stateful transforms, or SplittableDoFns,
> those are usually relegated to the root of a fused stage, and avoids
> fusions with each other. That can also cause additional stages.
>
> If Beam adopted a rigorous notion of Key Preserving for transforms,
> multiple stateful transforms could be fused in the same stage. But that's 
> a
> very different discussion.
>
> On Thu, Dec 14, 2023, 4:03 PM Joey Tran 
> wrote:
>
>> Thanks for the explanation!
>>
>> That matches with my intuition - are there any other rules with side
>> inputs?
>>
>> I might be misunderstanding the actual cause of the fusion breaks in
>> our pipeline, but we essentially have one part of the graph 

Re: How do side inputs relate to stage fusion?

2023-12-15 Thread Robert Bradshaw via dev
There is definitely a body of future work in intelligently merging
compatible-but-not-equal environments. (Dataflow does this for example.)
Defining/detecting compatibility is not always easy, but sometimes is, and
we should at least cover those cases and grow them over time.

On Fri, Dec 15, 2023 at 5:57 AM Joey Tran  wrote:

> Yeah I can confirm for the python runners (based on my reading of the
> translations.py [1]) that only identical environments are merged together.
>
> The funny thing is that we _originally_ implemented this hint as an
> annotation but then changed it to hint because it semantically felt more
> correct. I think we might go back to that since the environment merging
> logic isn't too flexible / easy to customize. Our type of hint is a bit
> unlike other hints anyways. Unlike resources like MinRam, these resources
> are additive (e.g. you can merge an environment that requires license A and
> an environment that requires license B into an environment that requires
> both A and B)
>
> [1]
> https://github.com/apache/beam/blob/5fb4db31994d7c2c1e04d32a4b153bc83d739f36/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L4
>
> On Fri, Dec 15, 2023 at 8:43 AM Robert Burke  wrote:
>
>> That would do it. We got so tunnel visioned on side inputs we missed that!
>>
>> IIRC the python local runner and Prism both only fuse transforms in
>> identical environments together. So any environmental diffs will prevent
>> fusion.
>>
>> Runners as a rule are usually free to ignore/manage hints as they like.
>> Transform annotations might be an alternative, but how those are managed
>> would be more SDK specific.
>>
>> On Fri, Dec 15, 2023, 5:21 AM Joey Tran 
>> wrote:
>>
>>> I figured out my issue. I thought side inputs were breaking up my
>>> pipeline but after experimenting with my transforms I now realize what was
>>> actually breaking it up was different transform environments that weren't
>>> considered compatible.
>>>
>>> We have a custom resource hint (for specifying whether a transform needs
>>> access to some software license) that we use with our transforms and that's
>>> what was preventing the fusion I was expecting. I'm I'm looking into how to
>>> make these hints mergeable now.
>>>
>>> On Thu, Dec 14, 2023 at 7:46 PM Robert Burke  wrote:
>>>
 Building on what Robert Bradshaw has said, basically, if these fusion
 breaks don't exist, the pipeline can live lock, because the side input is
 unable to finish computing for a given input element's window.

 I have recently added fusion to the Go Prism runner based on the python
 side input semantics, and i was surprised that there are basically two
 rules for fusion. The side input one, and for handling Stateful processing.


 This code here is the greedy fusion algorithm that Python uses, but a
 less set based, so it might be easier to follow:
 https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess.go#L513

 From the linked code comment:

 Side Inputs: A transform S consuming a PCollection as a side input can't
  be fused with the transform P that produces that PCollection. Further,
 no transform S+ descended from S, can be fused with transform P.

 Ideally I'll add visual representations of the graphs in the test suite
 here, that validates the side input dependency logic:


 https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go#L398

 (Note, that test doesn't validate expected fusion results, Prism is a
 work in progress).


 As for the Stateful rule, this is largely an implementation convenience
 for runners to ensure correct execution.
 If your pipeline also uses Stateful transforms, or SplittableDoFns,
 those are usually relegated to the root of a fused stage, and avoids
 fusions with each other. That can also cause additional stages.

 If Beam adopted a rigorous notion of Key Preserving for transforms,
 multiple stateful transforms could be fused in the same stage. But that's a
 very different discussion.

 On Thu, Dec 14, 2023, 4:03 PM Joey Tran 
 wrote:

> Thanks for the explanation!
>
> That matches with my intuition - are there any other rules with side
> inputs?
>
> I might be misunderstanding the actual cause of the fusion breaks in
> our pipeline, but we essentially have one part of the graph that produces
> many small collections that are used as side inputs in the remaining part
> of the graph. In other words, the "main graph" is mostly linear but uses
> side inputs from the earlier part of the graph.
>
>  Since the main graph is mostly linear, I expected few stages, but
> what I actually see are a lot of breaks around the side input requiring
> transforms.
>
>
> Tangentially, are there any 

Re: How do side inputs relate to stage fusion?

2023-12-15 Thread Joey Tran
Yeah I can confirm for the python runners (based on my reading of the
translations.py [1]) that only identical environments are merged together.

The funny thing is that we _originally_ implemented this hint as an
annotation but then changed it to hint because it semantically felt more
correct. I think we might go back to that since the environment merging
logic isn't too flexible / easy to customize. Our type of hint is a bit
unlike other hints anyways. Unlike resources like MinRam, these resources
are additive (e.g. you can merge an environment that requires license A and
an environment that requires license B into an environment that requires
both A and B)

[1]
https://github.com/apache/beam/blob/5fb4db31994d7c2c1e04d32a4b153bc83d739f36/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L4

On Fri, Dec 15, 2023 at 8:43 AM Robert Burke  wrote:

> That would do it. We got so tunnel visioned on side inputs we missed that!
>
> IIRC the python local runner and Prism both only fuse transforms in
> identical environments together. So any environmental diffs will prevent
> fusion.
>
> Runners as a rule are usually free to ignore/manage hints as they like.
> Transform annotations might be an alternative, but how those are managed
> would be more SDK specific.
>
> On Fri, Dec 15, 2023, 5:21 AM Joey Tran  wrote:
>
>> I figured out my issue. I thought side inputs were breaking up my
>> pipeline but after experimenting with my transforms I now realize what was
>> actually breaking it up was different transform environments that weren't
>> considered compatible.
>>
>> We have a custom resource hint (for specifying whether a transform needs
>> access to some software license) that we use with our transforms and that's
>> what was preventing the fusion I was expecting. I'm I'm looking into how to
>> make these hints mergeable now.
>>
>> On Thu, Dec 14, 2023 at 7:46 PM Robert Burke  wrote:
>>
>>> Building on what Robert Bradshaw has said, basically, if these fusion
>>> breaks don't exist, the pipeline can live lock, because the side input is
>>> unable to finish computing for a given input element's window.
>>>
>>> I have recently added fusion to the Go Prism runner based on the python
>>> side input semantics, and i was surprised that there are basically two
>>> rules for fusion. The side input one, and for handling Stateful processing.
>>>
>>>
>>> This code here is the greedy fusion algorithm that Python uses, but a
>>> less set based, so it might be easier to follow:
>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess.go#L513
>>>
>>> From the linked code comment:
>>>
>>> Side Inputs: A transform S consuming a PCollection as a side input can't
>>>  be fused with the transform P that produces that PCollection. Further,
>>> no transform S+ descended from S, can be fused with transform P.
>>>
>>> Ideally I'll add visual representations of the graphs in the test suite
>>> here, that validates the side input dependency logic:
>>>
>>>
>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go#L398
>>>
>>> (Note, that test doesn't validate expected fusion results, Prism is a
>>> work in progress).
>>>
>>>
>>> As for the Stateful rule, this is largely an implementation convenience
>>> for runners to ensure correct execution.
>>> If your pipeline also uses Stateful transforms, or SplittableDoFns,
>>> those are usually relegated to the root of a fused stage, and avoids
>>> fusions with each other. That can also cause additional stages.
>>>
>>> If Beam adopted a rigorous notion of Key Preserving for transforms,
>>> multiple stateful transforms could be fused in the same stage. But that's a
>>> very different discussion.
>>>
>>> On Thu, Dec 14, 2023, 4:03 PM Joey Tran 
>>> wrote:
>>>
 Thanks for the explanation!

 That matches with my intuition - are there any other rules with side
 inputs?

 I might be misunderstanding the actual cause of the fusion breaks in
 our pipeline, but we essentially have one part of the graph that produces
 many small collections that are used as side inputs in the remaining part
 of the graph. In other words, the "main graph" is mostly linear but uses
 side inputs from the earlier part of the graph.

  Since the main graph is mostly linear, I expected few stages, but what
 I actually see are a lot of breaks around the side input requiring
 transforms.


 Tangentially, are there any general tips for understanding why a graph
 might be fused the way it was?

 On Thu, Dec 14, 2023, 6:10 PM Robert Bradshaw via dev <
 dev@beam.apache.org> wrote:

> That is correct. Side inputs give a view of the "whole" PCollection
> and hence introduce a fusion-producing barrier. For example, suppose one
> has a DoFn that produces two outputs, mainPColl and sidePColl, that are
> consumed (as the main and 

Re: How do side inputs relate to stage fusion?

2023-12-15 Thread Robert Burke
That would do it. We got so tunnel visioned on side inputs we missed that!

IIRC the python local runner and Prism both only fuse transforms in
identical environments together. So any environmental diffs will prevent
fusion.

Runners as a rule are usually free to ignore/manage hints as they like.
Transform annotations might be an alternative, but how those are managed
would be more SDK specific.

On Fri, Dec 15, 2023, 5:21 AM Joey Tran  wrote:

> I figured out my issue. I thought side inputs were breaking up my pipeline
> but after experimenting with my transforms I now realize what was actually
> breaking it up was different transform environments that weren't considered
> compatible.
>
> We have a custom resource hint (for specifying whether a transform needs
> access to some software license) that we use with our transforms and that's
> what was preventing the fusion I was expecting. I'm I'm looking into how to
> make these hints mergeable now.
>
> On Thu, Dec 14, 2023 at 7:46 PM Robert Burke  wrote:
>
>> Building on what Robert Bradshaw has said, basically, if these fusion
>> breaks don't exist, the pipeline can live lock, because the side input is
>> unable to finish computing for a given input element's window.
>>
>> I have recently added fusion to the Go Prism runner based on the python
>> side input semantics, and i was surprised that there are basically two
>> rules for fusion. The side input one, and for handling Stateful processing.
>>
>>
>> This code here is the greedy fusion algorithm that Python uses, but a
>> less set based, so it might be easier to follow:
>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess.go#L513
>>
>> From the linked code comment:
>>
>> Side Inputs: A transform S consuming a PCollection as a side input can't
>>  be fused with the transform P that produces that PCollection. Further,
>> no transform S+ descended from S, can be fused with transform P.
>>
>> Ideally I'll add visual representations of the graphs in the test suite
>> here, that validates the side input dependency logic:
>>
>>
>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go#L398
>>
>> (Note, that test doesn't validate expected fusion results, Prism is a
>> work in progress).
>>
>>
>> As for the Stateful rule, this is largely an implementation convenience
>> for runners to ensure correct execution.
>> If your pipeline also uses Stateful transforms, or SplittableDoFns, those
>> are usually relegated to the root of a fused stage, and avoids fusions with
>> each other. That can also cause additional stages.
>>
>> If Beam adopted a rigorous notion of Key Preserving for transforms,
>> multiple stateful transforms could be fused in the same stage. But that's a
>> very different discussion.
>>
>> On Thu, Dec 14, 2023, 4:03 PM Joey Tran 
>> wrote:
>>
>>> Thanks for the explanation!
>>>
>>> That matches with my intuition - are there any other rules with side
>>> inputs?
>>>
>>> I might be misunderstanding the actual cause of the fusion breaks in our
>>> pipeline, but we essentially have one part of the graph that produces many
>>> small collections that are used as side inputs in the remaining part of the
>>> graph. In other words, the "main graph" is mostly linear but uses side
>>> inputs from the earlier part of the graph.
>>>
>>>  Since the main graph is mostly linear, I expected few stages, but what
>>> I actually see are a lot of breaks around the side input requiring
>>> transforms.
>>>
>>>
>>> Tangentially, are there any general tips for understanding why a graph
>>> might be fused the way it was?
>>>
>>> On Thu, Dec 14, 2023, 6:10 PM Robert Bradshaw via dev <
>>> dev@beam.apache.org> wrote:
>>>
 That is correct. Side inputs give a view of the "whole" PCollection and
 hence introduce a fusion-producing barrier. For example, suppose one has a
 DoFn that produces two outputs, mainPColl and sidePColl, that are consumed
 (as the main and side input respectively) of DoFnB.

    mainPColl - DoFnB
 /^
 inPColl -- DoFnA |
 \|
    sidePColl --- /


 Now DoFnB may iterate over the entity of sidePColl for every element of
 mainPColl. This means that DoFnA and DoFnB cannot be fused, which
 would require DoFnB to consume the elements as they are produced from
 DoFnA, but we need DoFnA to run to completion before we know the contents
 of sidePColl.

 Similar constraints apply in larger graphs (e.g. there may be many
 intermediate DoFns and PCollections), but they principally boil down to
 shapes that look like this.

 Though this does not introduce a global barrier in streaming, there is
 still the analogous per window/watermark barrier that prevents fusion for

Re: How do side inputs relate to stage fusion?

2023-12-15 Thread Joey Tran
I figured out my issue. I thought side inputs were breaking up my pipeline
but after experimenting with my transforms I now realize what was actually
breaking it up was different transform environments that weren't considered
compatible.

We have a custom resource hint (for specifying whether a transform needs
access to some software license) that we use with our transforms and that's
what was preventing the fusion I was expecting. I'm I'm looking into how to
make these hints mergeable now.

On Thu, Dec 14, 2023 at 7:46 PM Robert Burke  wrote:

> Building on what Robert Bradshaw has said, basically, if these fusion
> breaks don't exist, the pipeline can live lock, because the side input is
> unable to finish computing for a given input element's window.
>
> I have recently added fusion to the Go Prism runner based on the python
> side input semantics, and i was surprised that there are basically two
> rules for fusion. The side input one, and for handling Stateful processing.
>
>
> This code here is the greedy fusion algorithm that Python uses, but a less
> set based, so it might be easier to follow:
> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess.go#L513
>
> From the linked code comment:
>
> Side Inputs: A transform S consuming a PCollection as a side input can't
>  be fused with the transform P that produces that PCollection. Further,
> no transform S+ descended from S, can be fused with transform P.
>
> Ideally I'll add visual representations of the graphs in the test suite
> here, that validates the side input dependency logic:
>
>
> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go#L398
>
> (Note, that test doesn't validate expected fusion results, Prism is a work
> in progress).
>
>
> As for the Stateful rule, this is largely an implementation convenience
> for runners to ensure correct execution.
> If your pipeline also uses Stateful transforms, or SplittableDoFns, those
> are usually relegated to the root of a fused stage, and avoids fusions with
> each other. That can also cause additional stages.
>
> If Beam adopted a rigorous notion of Key Preserving for transforms,
> multiple stateful transforms could be fused in the same stage. But that's a
> very different discussion.
>
> On Thu, Dec 14, 2023, 4:03 PM Joey Tran  wrote:
>
>> Thanks for the explanation!
>>
>> That matches with my intuition - are there any other rules with side
>> inputs?
>>
>> I might be misunderstanding the actual cause of the fusion breaks in our
>> pipeline, but we essentially have one part of the graph that produces many
>> small collections that are used as side inputs in the remaining part of the
>> graph. In other words, the "main graph" is mostly linear but uses side
>> inputs from the earlier part of the graph.
>>
>>  Since the main graph is mostly linear, I expected few stages, but what I
>> actually see are a lot of breaks around the side input requiring transforms.
>>
>>
>> Tangentially, are there any general tips for understanding why a graph
>> might be fused the way it was?
>>
>> On Thu, Dec 14, 2023, 6:10 PM Robert Bradshaw via dev <
>> dev@beam.apache.org> wrote:
>>
>>> That is correct. Side inputs give a view of the "whole" PCollection and
>>> hence introduce a fusion-producing barrier. For example, suppose one has a
>>> DoFn that produces two outputs, mainPColl and sidePColl, that are consumed
>>> (as the main and side input respectively) of DoFnB.
>>>
>>>    mainPColl - DoFnB
>>> /^
>>> inPColl -- DoFnA |
>>> \|
>>>    sidePColl --- /
>>>
>>>
>>> Now DoFnB may iterate over the entity of sidePColl for every element of
>>> mainPColl. This means that DoFnA and DoFnB cannot be fused, which
>>> would require DoFnB to consume the elements as they are produced from
>>> DoFnA, but we need DoFnA to run to completion before we know the contents
>>> of sidePColl.
>>>
>>> Similar constraints apply in larger graphs (e.g. there may be many
>>> intermediate DoFns and PCollections), but they principally boil down to
>>> shapes that look like this.
>>>
>>> Though this does not introduce a global barrier in streaming, there is
>>> still the analogous per window/watermark barrier that prevents fusion for
>>> the same reasons.
>>>
>>>
>>>
>>>
>>> On Thu, Dec 14, 2023 at 3:02 PM Joey Tran 
>>> wrote:
>>>
 Hey all,

 We have a pretty big pipeline and while I was inspecting the stages, I
 noticed there is less fusion than I expected. I suspect it has to do with
 the heavy use of side inputs in our workflow. In the python sdk, I see that
 side inputs are considered when determining whether two stages are fusible.
 I have a hard time getting a clear understanding of the logic though. Could
 someone clarify / summarize the rules 

Re: How do side inputs relate to stage fusion?

2023-12-14 Thread Robert Burke
Building on what Robert Bradshaw has said, basically, if these fusion
breaks don't exist, the pipeline can live lock, because the side input is
unable to finish computing for a given input element's window.

I have recently added fusion to the Go Prism runner based on the python
side input semantics, and i was surprised that there are basically two
rules for fusion. The side input one, and for handling Stateful processing.


This code here is the greedy fusion algorithm that Python uses, but a less
set based, so it might be easier to follow:
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess.go#L513

>From the linked code comment:

Side Inputs: A transform S consuming a PCollection as a side input can't
 be fused with the transform P that produces that PCollection. Further,
no transform S+ descended from S, can be fused with transform P.

Ideally I'll add visual representations of the graphs in the test suite
here, that validates the side input dependency logic:

https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go#L398

(Note, that test doesn't validate expected fusion results, Prism is a work
in progress).


As for the Stateful rule, this is largely an implementation convenience for
runners to ensure correct execution.
If your pipeline also uses Stateful transforms, or SplittableDoFns, those
are usually relegated to the root of a fused stage, and avoids fusions with
each other. That can also cause additional stages.

If Beam adopted a rigorous notion of Key Preserving for transforms,
multiple stateful transforms could be fused in the same stage. But that's a
very different discussion.

On Thu, Dec 14, 2023, 4:03 PM Joey Tran  wrote:

> Thanks for the explanation!
>
> That matches with my intuition - are there any other rules with side
> inputs?
>
> I might be misunderstanding the actual cause of the fusion breaks in our
> pipeline, but we essentially have one part of the graph that produces many
> small collections that are used as side inputs in the remaining part of the
> graph. In other words, the "main graph" is mostly linear but uses side
> inputs from the earlier part of the graph.
>
>  Since the main graph is mostly linear, I expected few stages, but what I
> actually see are a lot of breaks around the side input requiring transforms.
>
>
> Tangentially, are there any general tips for understanding why a graph
> might be fused the way it was?
>
> On Thu, Dec 14, 2023, 6:10 PM Robert Bradshaw via dev 
> wrote:
>
>> That is correct. Side inputs give a view of the "whole" PCollection and
>> hence introduce a fusion-producing barrier. For example, suppose one has a
>> DoFn that produces two outputs, mainPColl and sidePColl, that are consumed
>> (as the main and side input respectively) of DoFnB.
>>
>>    mainPColl - DoFnB
>> /^
>> inPColl -- DoFnA |
>> \|
>>    sidePColl --- /
>>
>>
>> Now DoFnB may iterate over the entity of sidePColl for every element of
>> mainPColl. This means that DoFnA and DoFnB cannot be fused, which
>> would require DoFnB to consume the elements as they are produced from
>> DoFnA, but we need DoFnA to run to completion before we know the contents
>> of sidePColl.
>>
>> Similar constraints apply in larger graphs (e.g. there may be many
>> intermediate DoFns and PCollections), but they principally boil down to
>> shapes that look like this.
>>
>> Though this does not introduce a global barrier in streaming, there is
>> still the analogous per window/watermark barrier that prevents fusion for
>> the same reasons.
>>
>>
>>
>>
>> On Thu, Dec 14, 2023 at 3:02 PM Joey Tran 
>> wrote:
>>
>>> Hey all,
>>>
>>> We have a pretty big pipeline and while I was inspecting the stages, I
>>> noticed there is less fusion than I expected. I suspect it has to do with
>>> the heavy use of side inputs in our workflow. In the python sdk, I see that
>>> side inputs are considered when determining whether two stages are fusible.
>>> I have a hard time getting a clear understanding of the logic though. Could
>>> someone clarify / summarize the rules around this?
>>>
>>> Thanks!
>>> Joey
>>>
>>


Re: How do side inputs relate to stage fusion?

2023-12-14 Thread Joey Tran
Thanks for the explanation!

That matches with my intuition - are there any other rules with side
inputs?

I might be misunderstanding the actual cause of the fusion breaks in our
pipeline, but we essentially have one part of the graph that produces many
small collections that are used as side inputs in the remaining part of the
graph. In other words, the "main graph" is mostly linear but uses side
inputs from the earlier part of the graph.

 Since the main graph is mostly linear, I expected few stages, but what I
actually see are a lot of breaks around the side input requiring transforms.


Tangentially, are there any general tips for understanding why a graph
might be fused the way it was?

On Thu, Dec 14, 2023, 6:10 PM Robert Bradshaw via dev 
wrote:

> That is correct. Side inputs give a view of the "whole" PCollection and
> hence introduce a fusion-producing barrier. For example, suppose one has a
> DoFn that produces two outputs, mainPColl and sidePColl, that are consumed
> (as the main and side input respectively) of DoFnB.
>
>    mainPColl - DoFnB
> /^
> inPColl -- DoFnA |
> \|
>    sidePColl --- /
>
>
> Now DoFnB may iterate over the entity of sidePColl for every element of
> mainPColl. This means that DoFnA and DoFnB cannot be fused, which
> would require DoFnB to consume the elements as they are produced from
> DoFnA, but we need DoFnA to run to completion before we know the contents
> of sidePColl.
>
> Similar constraints apply in larger graphs (e.g. there may be many
> intermediate DoFns and PCollections), but they principally boil down to
> shapes that look like this.
>
> Though this does not introduce a global barrier in streaming, there is
> still the analogous per window/watermark barrier that prevents fusion for
> the same reasons.
>
>
>
>
> On Thu, Dec 14, 2023 at 3:02 PM Joey Tran 
> wrote:
>
>> Hey all,
>>
>> We have a pretty big pipeline and while I was inspecting the stages, I
>> noticed there is less fusion than I expected. I suspect it has to do with
>> the heavy use of side inputs in our workflow. In the python sdk, I see that
>> side inputs are considered when determining whether two stages are fusible.
>> I have a hard time getting a clear understanding of the logic though. Could
>> someone clarify / summarize the rules around this?
>>
>> Thanks!
>> Joey
>>
>


Re: How do side inputs relate to stage fusion?

2023-12-14 Thread Robert Bradshaw via dev
That is correct. Side inputs give a view of the "whole" PCollection and
hence introduce a fusion-producing barrier. For example, suppose one has a
DoFn that produces two outputs, mainPColl and sidePColl, that are consumed
(as the main and side input respectively) of DoFnB.

   mainPColl - DoFnB
/^
inPColl -- DoFnA |
\|
   sidePColl --- /


Now DoFnB may iterate over the entity of sidePColl for every element of
mainPColl. This means that DoFnA and DoFnB cannot be fused, which
would require DoFnB to consume the elements as they are produced from
DoFnA, but we need DoFnA to run to completion before we know the contents
of sidePColl.

Similar constraints apply in larger graphs (e.g. there may be many
intermediate DoFns and PCollections), but they principally boil down to
shapes that look like this.

Though this does not introduce a global barrier in streaming, there is
still the analogous per window/watermark barrier that prevents fusion for
the same reasons.




On Thu, Dec 14, 2023 at 3:02 PM Joey Tran  wrote:

> Hey all,
>
> We have a pretty big pipeline and while I was inspecting the stages, I
> noticed there is less fusion than I expected. I suspect it has to do with
> the heavy use of side inputs in our workflow. In the python sdk, I see that
> side inputs are considered when determining whether two stages are fusible.
> I have a hard time getting a clear understanding of the logic though. Could
> someone clarify / summarize the rules around this?
>
> Thanks!
> Joey
>


How do side inputs relate to stage fusion?

2023-12-14 Thread Joey Tran
Hey all,

We have a pretty big pipeline and while I was inspecting the stages, I
noticed there is less fusion than I expected. I suspect it has to do with
the heavy use of side inputs in our workflow. In the python sdk, I see that
side inputs are considered when determining whether two stages are fusible.
I have a hard time getting a clear understanding of the logic though. Could
someone clarify / summarize the rules around this?

Thanks!
Joey