Re: PTransform.expand() guarantees

2019-06-24 Thread Alexey Romanenko
On 21 Jun 2019, at 19:12, Lukasz Cwik  wrote:
> This question was triggered by one issue in KinesisIO [1] where we did quite 
> expensive check that stream exists in Write.expand() (calling backend Kinesis 
> for that) and, according to issue reporter, it was called several times, 
> though I always believed that it should be called only once. So, that puzzled 
> me and I came to these questions. 
> 
> [1] https://issues.apache.org/jira/browse/BEAM-7357 
> 
> 
> Interesting, did you print a stack trace to figure out all the call site 
> locations?
> It could be something simple where we are calling expand multiple times 
> needlessly and only using the result once.

No, since it was not me, who reported this, and I could not reproduce this on 
my side. Now I’m thinking that actual issue could be caused by two correlated 
things and it’s not exactly related to how many times “expand()” was called.
Anyway, probably adding more details about expected “expand” behaviour into 
documentation would be helpful. 

Re: PTransform.expand() guarantees

2019-06-21 Thread Lukasz Cwik
On Fri, Jun 21, 2019 at 10:01 AM Alexey Romanenko 
wrote:

> Thank you for answers, Lukasz.
>
> On 21 Jun 2019, at 18:15, Lukasz Cwik  wrote:
>
>> Does Beam guarantee where (at “driver” or at "worker” of backend system) "
>> *PTransform.expand()*” of provided transform will be called?
>>
> No. There are usecases where the driver is run in the "cloud" such as
> template generation and also during cross language pipeline expansion. At
> some point in time when I was investigating loops within Beam, one possible
> solution would have been to call expand() in the "worker" whenever a new
> loop iteration needed to be generated.
>
>
> Offtopic: what was a resolution about loops support? Is there any public
> doc about that to read?
>

https://issues.apache.org/jira/browse/BEAM-106 is the feature request.
Support for iteration got to the point where I had a working demo in
Dataflow batch but couldn't get it working in streaming due to lack of
time. The conceptual idea from the model was to treat it as an infinitely
unrollable loop which could be done dynamically during pipeline execution.
This seemed much easier then updating the entire system to support
multidimensional watermarks. It also seemed difficult to get support in
other runners since they couldn't dynamically modify the execution graph at
runtime or didn't support multidimensional watermarks.


> Does Beam guarantee how many times it could be happened?
>>
> It should happen once per transform instance but why is it important?
>
>
> This question was triggered by one issue in KinesisIO [1] where we did
> quite expensive check that stream exists in Write.expand() (calling backend
> Kinesis for that) and, according to issue reporter, it was called several
> times, though I always believed that it should be called only once. So,
> that puzzled me and I came to these questions.
>
> [1] https://issues.apache.org/jira/browse/BEAM-7357
>

Interesting, did you print a stack trace to figure out all the call site
locations?
It could be something simple where we are calling expand multiple times
needlessly and only using the result once.


Re: PTransform.expand() guarantees

2019-06-21 Thread Alexey Romanenko
Thank you for answers, Lukasz. 

> On 21 Jun 2019, at 18:15, Lukasz Cwik  wrote:
> Does Beam guarantee where (at “driver” or at "worker” of backend system) 
> "PTransform.expand()” of provided transform will be called?  
> No. There are usecases where the driver is run in the "cloud" such as 
> template generation and also during cross language pipeline expansion. At 
> some point in time when I was investigating loops within Beam, one possible 
> solution would have been to call expand() in the "worker" whenever a new loop 
> iteration needed to be generated.

Offtopic: what was a resolution about loops support? Is there any public doc 
about that to read?

> Does Beam guarantee how many times it could be happened? 
> It should happen once per transform instance but why is it important?

This question was triggered by one issue in KinesisIO [1] where we did quite 
expensive check that stream exists in Write.expand() (calling backend Kinesis 
for that) and, according to issue reporter, it was called several times, though 
I always believed that it should be called only once. So, that puzzled me and I 
came to these questions. 

[1] https://issues.apache.org/jira/browse/BEAM-7357





Re: PTransform.expand() guarantees

2019-06-21 Thread Lukasz Cwik
On Fri, Jun 21, 2019 at 9:07 AM Alexey Romanenko 
wrote:

> Hello,
>
> I tried to find an answer in documentation for the questions below but I
> haven’t managed to do that. Actually, there are 3 related questions:
>
> Does Beam guarantee where (at “driver” or at "worker” of backend system) "
> *PTransform.expand()*” of provided transform will be called?
>
No. There are usecases where the driver is run in the "cloud" such as
template generation and also during cross language pipeline expansion. At
some point in time when I was investigating loops within Beam, one possible
solution would have been to call expand() in the "worker" whenever a new
loop iteration needed to be generated.


> Does Beam guarantee how many times it could be happened?
>
It should happen once per transform instance but why is it important?


> Does it depend on runner implementation or anything else?
>
It should not but historically in some places this has happened. Some
transforms were written with logic like am I a streaming pipeline or am I
running using Dataflow then do X. We have tried to prevent this from
happening and cleaned up places where we noticed this happens as this makes
pipelines hard to be portable across runners.

We have always wanted the driver program (wherever it may live) to give a
whole pipeline definition to the runner and the runner can then "optimize"
it by performing any additional PTransform replacements.


PTransform.expand() guarantees

2019-06-21 Thread Alexey Romanenko
Hello,

I tried to find an answer in documentation for the questions below but I 
haven’t managed to do that. Actually, there are 3 related questions:

Does Beam guarantee where (at “driver” or at "worker” of backend system) 
"PTransform.expand()” of provided transform will be called?  
Does Beam guarantee how many times it could be happened? 
Does it depend on runner implementation or anything else?