Flink 1.18 quickstart issues

2024-05-22 Thread Jan Lukavský

Hi,

currently our documentation for running quickstart examples uses maven 
and `exec:java` mojo. Unfortunately, there is a (not yet confirmed, but 
likely, see [1], [2]) Flink bug which causes Flink runner 1.18 to fail. 
The bug causes MiniCluster (which is used in local testing) use wrong 
classloader to load Flink's classes, resulting in 
ClassNotFoundException. The only known workaround possible is using 
`exec:exec` to fork new process having correct classpath right from the 
start of the Java process (exec:java uses context classloader before 
running user-core main() method). This can be done quite easily for 
running post-commit tests (runQuickstartJavaFlinkLocal), but any user 
who would try to run examples using the documented way (running mvn 
exec:java) would run into the bug.


The question is - can we change the way we run examples, so that a new 
JVM process with correctly set-up classpath can be spawn to run the user 
code? What would be the best way of doing this? Otherwise we must either 
fix the (probable) bug in Flink or release version 1.18 of Flink Runner 
which will fail to run quickstart examples. More info in [3].


Thanks for any suggestions.

 Jan

[1] https://www.mail-archive.com/user@flink.apache.org/msg52035.html

[2] https://lists.apache.org/thread/wdo4rz4q9qh0j11o5b76d45zlqkdx55n

[3] https://github.com/apache/beam/pull/31062



Re: PCollection#applyWindowingStrategyInternal

2024-05-06 Thread Jan Lukavský
Right. Generally speaking, merging of state would require some 
modifications to the model to support at least some hooks. +1


On 4/25/24 18:03, Reuven Lax via dev wrote:
I think there is more to it than that. You'll probably want 
retractions integrated into Beam's core triggering.


One example of where retractions are needed is with session windows. 
Early triggers are fairly broken with session windows because the 
windows themselves change as more data arrive. So an early trigger 
might generate data for two separate windows S1 and S2, but after more 
data arrives those two windows merge into a single S3. Retractions 
solve this by hooking into the window merging logic, retracting the 
outputs for S1 and S2 before outputting S3. I don't think this is 
possible today with a DSL.


On Thu, Apr 25, 2024 at 5:46 AM Jan Lukavský  wrote:

> To implement retraction (at least to do so efficiently) I think
you'll want it integrated in the model. e.g. for combiners one
would want to add the option to subtract values, otherwise you
would end up having to store every element defeating the
performance that combiners provide.

I think we use different words for the same. :) This is what I
meant by "and define appropriate retraction functions to CombineFn
and the like".

On the other hand, you _could_ derive CombineFn with retraction
capabilities, if you have a DSL that provides "retraction
accumulation" function along with "addition accumulation", this
should be possible to be combined into a CombineFn as we have it
today. This should only require that the operation being combined
is associative, commutative and have a valid unary operator '-'
(which will be the result of the "retraction combine").

On 4/23/24 18:08, Reuven Lax via dev wrote:



On Tue, Apr 23, 2024 at 7:52 AM Jan Lukavský  wrote:

On 4/22/24 20:40, Kenneth Knowles wrote:


I'll go ahead and advertise
https://s.apache.org/beam-sink-triggers again for this thread.

+1


There are a couple of difficult technical problems in there.
One of them is backwards-propagating triggering to minimize
extra latency. We can probably solve this as well as we
solve forward-propagating without too much trouble. We also
can/should leave it abstract so runners can implement either
through back-propagating local triggering rules or using
run-time communication to trigger upstream. These actually
could interact well with stateful ParDo by sending a
"trigger now please" message or some such.

Yes, this was what I was referring to as the more
"functional" style for stateful ParDo. At minimum, it
requires adding new callback independent of @ProcessElement
and @OnTimer -  @OnTrigger?


But we also probably need retractions that automatically
flow through the pipeline and update aggregations. Why?
Because currently triggers don't just control update
frequency but actually create new elements each time, so
they require user fix-up logic to do the right thing with
the output. When we go to higher levels of abstraction we
need this to "just work" without changing the
pipeline. There have been two (nearly identical) propotypes
of adding retractions to the DirectRunner as proof of
concept. But there's also work in all the IOs since they are
not retraction-aware. Also lots of work in many library
transforms where a retraction should be computed by running
the transform "like normal" but then negating the result,
but that cannot be the default for ParDo because it is
deliberately more flexible, we just have to annotate Map and
the like.

+1. I think retractions could be implemented as DSL on top of
the current model. Retractions can be viewed as regular data
elements with additional metadata (upsert, delete). For ParDo
we could add something like @RetractElement (and define
appropriate retraction functions to CombineFn and the like).
We could introduce RetractedPCollection or similar for this
purpose.


To implement retraction (at least to do so efficiently) I think
you'll want it integrated in the model. e.g. for combiners one
would want to add the option to subtract values, otherwise you
would end up having to store every element defeating the
performance that combiners provide.



Getting all this right is a lot of work but would result in
a system that is simpler to use out-of-the-box and a more
robust SQL implementation (because you can't use triggers
with SQL unless you have retractions or some other "just
works" mode of computation

Re: [VOTE] Release 2.56.0, release candidate #2

2024-04-29 Thread Jan Lukavský

+1 (binding).

Tested Java SDK with Flink runner.

 Jan

On 4/28/24 15:32, XQ Hu via dev wrote:
+1 (non-binding). Tested it using the dataflow ML pipeline: 
https://github.com/google/dataflow-ml-starter/actions/runs/8862170843/job/24334816481


On Sat, Apr 27, 2024 at 7:42 AM Danny McCormick via dev 
 wrote:


Hi everyone,
Please review and vote on the release candidate #2 for the version
2.56.0, as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)

Reviewers are encouraged to test their own use cases with the
release candidate, and vote +1 if no issues are found. Only PMC
member votes will count towards the final vote, but votes from all
community members is encouraged and helpful for finding
regressions; you can either test your own use cases [13] or use
cases from the validation sheet [10].

The complete staging area is available for your review, which
includes:
* GitHub Release notes [1],
* the official Apache source release to be deployed to
dist.apache.org  [2], which is signed with
the key with fingerprint D20316F712213422 [3],
* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag "v2.56.0-RC2" [5],
* website pull request listing the release [6], the blog post [6],
and publishing the API reference manual [7].
* Python artifacts are deployed along with the source release to
the dist.apache.org  [2] and PyPI[8].
* Go artifacts and documentation are available at pkg.go.dev
 [9]
* Validation sheet with a tab for 2.56.0 release to help with
validation [10].
* Docker images published to Docker Hub [11].
* PR to run tests against release branch [12].

The vote will be open for at least 72 hours. It is adopted by
majority approval, with at least 3 PMC affirmative votes.

For guidelines on how to try the release in your projects, check
out our RC testing guide [13].

Thanks,
Danny

[1] https://github.com/apache/beam/milestone/20
[2] https://dist.apache.org/repos/dist/dev/beam/2.56.0/
[3] https://dist.apache.org/repos/dist/release/beam/KEYS
[4]
https://repository.apache.org/content/repositories/orgapachebeam-1377/
[5] https://github.com/apache/beam/tree/v2.56.0-RC2
[6] https://github.com/apache/beam/pull/31094
[7] https://github.com/apache/beam-site/pull/665
[8] https://pypi.org/project/apache-beam/2.56.0rc2/
[9]
https://pkg.go.dev/github.com/apache/beam/sdks/v2@v2.56.0-RC2/go/pkg/beam
[10]

https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=1992402651
[11] https://hub.docker.com/search?q=apache%2Fbeam=image

[12] https://github.com/apache/beam/pull/31038
[13]

https://github.com/apache/beam/blob/master/contributor-docs/rc-testing-guide.md


Re: [VOTE] Release 2.56.0, release candidate #1

2024-04-26 Thread Jan Lukavský

+1 (binding)

Validated Java SDK with Flink runner.

 Jan

On 4/25/24 05:17, XQ Hu via dev wrote:
+1 (non binding). Tested the simple Dataflow ML job: 
https://github.com/google/dataflow-ml-starter/actions/runs/8824985423/job/24228468173


On Wed, Apr 24, 2024 at 2:01 PM Danny McCormick via dev 
 wrote:


Hi everyone,
Please review and vote on the release candidate #1 for the version
2.56.0, as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)

Reviewers are encouraged to test their own use cases with the
release candidate, and vote +1 if no issues are found. Only PMC
member votes will count towards the final vote, but votes from all
community members is encouraged and helpful for finding
regressions; you can either test your own use cases [13] or use
cases from the validation sheet [10].

The complete staging area is available for your review, which
includes:
* GitHub Release notes [1],
* the official Apache source release to be deployed to
dist.apache.org  [2], which is signed with
the key with fingerprint D20316F712213422 [3],
* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag "v2.56.0-RC1" [5],
* website pull request listing the release [6], the blog post [6],
and publishing the API reference manual [7].
* Python artifacts are deployed along with the source release to
the dist.apache.org  [2] and PyPI[8].
* Go artifacts and documentation are available at pkg.go.dev
 [9]
* Validation sheet with a tab for 2.56.0 release to help with
validation [10].
* Docker images published to Docker Hub [11].
* PR to run tests against release branch [12].

The vote will be open for at least 72 hours. It is adopted by
majority approval, with at least 3 PMC affirmative votes.

For guidelines on how to try the release in your projects, check
out our RC testing guide [13].

Thanks,
Danny

[1] https://github.com/apache/beam/milestone/20
[2] https://dist.apache.org/repos/dist/dev/beam/2.56.0/
[3] https://dist.apache.org/repos/dist/release/beam/KEYS
[4]
https://repository.apache.org/content/repositories/orgapachebeam-1376/
[5] https://github.com/apache/beam/tree/v2.56.0-RC1
[6] https://github.com/apache/beam/pull/31094
[7] https://github.com/apache/beam-site/pull/664
[8] https://pypi.org/project/apache-beam/2.56.0rc1/
[9]
https://pkg.go.dev/github.com/apache/beam/sdks/v2@v2.56.0-RC1/go/pkg/beam
[10]

https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=1992402651
[11] https://hub.docker.com/search?q=apache%2Fbeam=image

[12] https://github.com/apache/beam/pull/31038
[13]

https://github.com/apache/beam/blob/master/contributor-docs/rc-testing-guide.md


Re: PCollection#applyWindowingStrategyInternal

2024-04-25 Thread Jan Lukavský
> To implement retraction (at least to do so efficiently) I think 
you'll want it integrated in the model. e.g. for combiners one would 
want to add the option to subtract values, otherwise you would end up 
having to store every element defeating the performance that combiners 
provide.


I think we use different words for the same. :) This is what I meant by 
"and define appropriate retraction functions to CombineFn and the like".


On the other hand, you _could_ derive CombineFn with retraction 
capabilities, if you have a DSL that provides "retraction accumulation" 
function along with "addition accumulation", this should be possible to 
be combined into a CombineFn as we have it today. This should only 
require that the operation being combined is associative, commutative 
and have a valid unary operator '-' (which will be the result of the 
"retraction combine").


On 4/23/24 18:08, Reuven Lax via dev wrote:



On Tue, Apr 23, 2024 at 7:52 AM Jan Lukavský  wrote:

On 4/22/24 20:40, Kenneth Knowles wrote:


I'll go ahead and advertise
https://s.apache.org/beam-sink-triggers again for this thread.

+1


There are a couple of difficult technical problems in there. One
of them is backwards-propagating triggering to minimize extra
latency. We can probably solve this as well as we solve
forward-propagating without too much trouble. We also can/should
leave it abstract so runners can implement either through
back-propagating local triggering rules or using run-time
communication to trigger upstream. These actually could interact
well with stateful ParDo by sending a "trigger now please"
message or some such.

Yes, this was what I was referring to as the more "functional"
style for stateful ParDo. At minimum, it requires adding new
callback independent of @ProcessElement and @OnTimer -  @OnTrigger?


But we also probably need retractions that automatically flow
through the pipeline and update aggregations. Why? Because
currently triggers don't just control update frequency but
actually create new elements each time, so they require
user fix-up logic to do the right thing with the output. When we
go to higher levels of abstraction we need this to "just work"
without changing the pipeline. There have been two (nearly
identical) propotypes of adding retractions to the DirectRunner
as proof of concept. But there's also work in all the IOs since
they are not retraction-aware. Also lots of work in many library
transforms where a retraction should be computed by running the
transform "like normal" but then negating the result, but that
cannot be the default for ParDo because it is deliberately more
flexible, we just have to annotate Map and the like.

+1. I think retractions could be implemented as DSL on top of the
current model. Retractions can be viewed as regular data elements
with additional metadata (upsert, delete). For ParDo we could add
something like @RetractElement (and define appropriate retraction
functions to CombineFn and the like). We could introduce
RetractedPCollection or similar for this purpose.


To implement retraction (at least to do so efficiently) I think you'll 
want it integrated in the model. e.g. for combiners one would want to 
add the option to subtract values, otherwise you would end up having 
to store every element defeating the performance that combiners provide.




Getting all this right is a lot of work but would result in a
system that is simpler to use out-of-the-box and a more robust
SQL implementation (because you can't use triggers with SQL
unless you have retractions or some other "just works" mode of
computation). It would essentially change Beam into a
delta-processing engine, which it arguably should be, with whole
append-only elements being a simplest degenerate case of a delta
(which would be highly optimized in batch/archival processing).

+1


Kenn

On Tue, Apr 16, 2024 at 2:36 AM Reuven Lax via dev
 wrote:

Yes, but that's inevitable as stateful ParDo in a sense live
outside of most of the window/trigger semantics. Basically a
stateful ParDo is the user executing low-level control over
these semantics, and controlling output frequency themselves
with timers. One could however still propagate the trigger
upstream of the stateful ParDo, though I'm not sure if that's
the best approach.

On Mon, Apr 15, 2024 at 11:31 PM Jan Lukavský
 wrote:

On 4/11/24 18:20, Reuven Lax via dev wrote:

I'm not sure it would require all that. A "basic"
implementation could be done on top of our existing
model. Essentially the user would specify triggers at

Re: PCollection#applyWindowingStrategyInternal

2024-04-23 Thread Jan Lukavský

On 4/22/24 20:40, Kenneth Knowles wrote:

I'll go ahead and advertise https://s.apache.org/beam-sink-triggers 
again for this thread.

+1


There are a couple of difficult technical problems in there. One of 
them is backwards-propagating triggering to minimize extra latency. We 
can probably solve this as well as we solve forward-propagating 
without too much trouble. We also can/should leave it abstract so 
runners can implement either through back-propagating local triggering 
rules or using run-time communication to trigger upstream. These 
actually could interact well with stateful ParDo by sending a "trigger 
now please" message or some such.
Yes, this was what I was referring to as the more "functional" style for 
stateful ParDo. At minimum, it requires adding new callback independent 
of @ProcessElement and @OnTimer -  @OnTrigger?


But we also probably need retractions that automatically flow through 
the pipeline and update aggregations. Why? Because currently triggers 
don't just control update frequency but actually create new elements 
each time, so they require user fix-up logic to do the right thing 
with the output. When we go to higher levels of abstraction we need 
this to "just work" without changing the pipeline. There have been two 
(nearly identical) propotypes of adding retractions to the 
DirectRunner as proof of concept. But there's also work in all the IOs 
since they are not retraction-aware. Also lots of work in many library 
transforms where a retraction should be computed by running the 
transform "like normal" but then negating the result, but that cannot 
be the default for ParDo because it is deliberately more flexible, we 
just have to annotate Map and the like.
+1. I think retractions could be implemented as DSL on top of the 
current model. Retractions can be viewed as regular data elements with 
additional metadata (upsert, delete). For ParDo we could add something 
like @RetractElement (and define appropriate retraction functions to 
CombineFn and the like). We could introduce RetractedPCollection or 
similar for this purpose.


Getting all this right is a lot of work but would result in a system 
that is simpler to use out-of-the-box and a more robust SQL 
implementation (because you can't use triggers with SQL unless you 
have retractions or some other "just works" mode of computation). It 
would essentially change Beam into a delta-processing engine, which it 
arguably should be, with whole append-only elements being a simplest 
degenerate case of a delta (which would be highly optimized in 
batch/archival processing).

+1


Kenn

On Tue, Apr 16, 2024 at 2:36 AM Reuven Lax via dev 
 wrote:


Yes, but that's inevitable as stateful ParDo in a sense live
outside of most of the window/trigger semantics. Basically a
stateful ParDo is the user executing low-level control over these
semantics, and controlling output frequency themselves with
timers. One could however still propagate the trigger upstream of
the stateful ParDo, though I'm not sure if that's the best approach.

On Mon, Apr 15, 2024 at 11:31 PM Jan Lukavský  wrote:

On 4/11/24 18:20, Reuven Lax via dev wrote:

I'm not sure it would require all that. A "basic"
implementation could be done on top of our existing model.
Essentially the user would specify triggers at the sink
ParDos, then the runner would walk backwards up the graph,
reverse-propagating these triggers (with some resolution
rules aimed at keeping the minimum trigger latency). The
runner could under the covers simply just apply the
appropriate trigger into the Window, using the current
mechanism. Of course building this all into the framework
from scratch would be cleaner, but we could also build this
on top of what we have.

Any propagation from sink to source would be blocked by any
stateful ParDo, because that does not adhere to the concept of
trigger, no? Hence, we could get the required downstream
'cadence' of outputs, but these would change only when the
upstream ParDo emits any data. Yes, one can argue that
stateful ParDo is supposed to emit data at fast as possible,
then this seems to work.


On Thu, Apr 11, 2024 at 5:10 AM Jan Lukavský
 wrote:

I've probably heard about it, but I never read the
proposal. Sounds great, but that would require to change
our ParDos from the 'directive' style to something more
functional, so that processing of elements, state updates
and outputting results can be decoupled and managed by
the runner independently. This goes exactly in the
direction of unifying GBK and Combine with stateful
ParDo. Sounds like something worth exploring for Beam 3. :)

 

Re: PCollection#applyWindowingStrategyInternal

2024-04-16 Thread Jan Lukavský

On 4/11/24 18:20, Reuven Lax via dev wrote:
I'm not sure it would require all that. A "basic" implementation could 
be done on top of our existing model. Essentially the user 
would specify triggers at the sink ParDos, then the runner would walk 
backwards up the graph, reverse-propagating these triggers (with some 
resolution rules aimed at keeping the minimum trigger latency). The 
runner could under the covers simply just apply the appropriate 
trigger into the Window, using the current mechanism. Of course 
building this all into the framework from scratch would be cleaner, 
but we could also build this on top of what we have.
Any propagation from sink to source would be blocked by any stateful 
ParDo, because that does not adhere to the concept of trigger, no? 
Hence, we could get the required downstream 'cadence' of outputs, but 
these would change only when the upstream ParDo emits any data. Yes, one 
can argue that stateful ParDo is supposed to emit data at fast as 
possible, then this seems to work.


On Thu, Apr 11, 2024 at 5:10 AM Jan Lukavský  wrote:

I've probably heard about it, but I never read the proposal.
Sounds great, but that would require to change our ParDos from the
'directive' style to something more functional, so that processing
of elements, state updates and outputting results can be decoupled
and managed by the runner independently. This goes exactly in the
direction of unifying GBK and Combine with stateful ParDo. Sounds
like something worth exploring for Beam 3. :)

Anyway, thanks for this discussion, helped me clarify some more
white spots.

 Jan

On 4/10/24 19:24, Reuven Lax via dev wrote:

Are you familiar with the "sink triggers" proposal?

Essentially while windowing is usually a property of the data,
and therefore flows downwards through the graph, triggering is
usually a property of output (i.e. sink) latency - how much are
you willing to wait to see data, and what semantics do you want
for this early data. Ideally triggers should be specified
separately at the ParDo level (Beam has no real notion of Sinks
as a special object, so to allow for output specification it has
to be on the ParDo), and the triggers should propagate up the
graph back to the source. This is in contrast to today where we
attach triggering to the windowing information.

This was a proposal some years back and there was some effort
made to implement it, but the implementation never really got off
the ground.

On Wed, Apr 10, 2024 at 12:43 AM Jan Lukavský 
wrote:

On 4/9/24 18:33, Kenneth Knowles wrote:

At a top level `setWindowingStrategyInternal` exists to set
up the metadata without actually assigning windows. If we
were more clever we might have found a way for it to not be
public... it is something that can easily lead to an invalid
pipeline.

Yes, that was what hit me about one minute after I started
this thread. :)


I think "compatible windows" today in Beam doesn't have very
good uses anyhow. I do see how when you are flattening
PCollections you might also want to explicitly have a
function that says "and here is how to reconcile their
different metadata". But is it not reasonable to use
Window.into(global window)? It doesn't seem like boilerplate
to me actually, but something you really want to know is
happening.


:)

Of course this was the way out, but I was somewhat
intuitively seeking something that could go this autonomously.

Generally speaking, we might have some room for improvement
in the way we handle windows and triggers - windows relate
only to GBK and stateful ParDo, triggers relate to GBK only.
They have no semantics if downstream processing does not use
any of these. There could be a pipeline preprocessing stage
that would discard (replace with meaningful defaults) any of
these metadata that is unused, but can cause Pipeline to fail
at construction time. It is also (to me) somewhat
questionable if triggers are really a property of a
PCollection or a property of a specific transform (GBK - ehm,
actually (stateless) 'key by' + 'reduce by key', but that is
completely different story :)) because (non-default) triggers
are likely not preserved across multiple transforms. Maybe
the correct subject of this thread could be "are we sure our
windowing and triggering semantics is 100% correct"? Probably
the - wrong - expectations at the beginning of this thread
were due to conflict in my mental model of how things 'could'
work as opposed to how they actually work. :)

 Jan



        Kenn

    On Tue, Apr 9, 2024 at 9:19 AM J

Re: PCollection#applyWindowingStrategyInternal

2024-04-11 Thread Jan Lukavský
I've probably heard about it, but I never read the proposal. Sounds 
great, but that would require to change our ParDos from the 'directive' 
style to something more functional, so that processing of elements, 
state updates and outputting results can be decoupled and managed by the 
runner independently. This goes exactly in the direction of unifying GBK 
and Combine with stateful ParDo. Sounds like something worth exploring 
for Beam 3. :)


Anyway, thanks for this discussion, helped me clarify some more white spots.

 Jan

On 4/10/24 19:24, Reuven Lax via dev wrote:

Are you familiar with the "sink triggers" proposal?

Essentially while windowing is usually a property of the data, and 
therefore flows downwards through the graph, triggering is usually a 
property of output (i.e. sink) latency - how much are you willing to 
wait to see data, and what semantics do you want for this early data. 
Ideally triggers should be specified separately at the ParDo level 
(Beam has no real notion of Sinks as a special object, so to allow for 
output specification it has to be on the ParDo), and the triggers 
should propagate up the graph back to the source. This is in contrast 
to today where we attach triggering to the windowing information.


This was a proposal some years back and there was some effort made to 
implement it, but the implementation never really got off the ground.


On Wed, Apr 10, 2024 at 12:43 AM Jan Lukavský  wrote:

On 4/9/24 18:33, Kenneth Knowles wrote:

At a top level `setWindowingStrategyInternal` exists to set up
the metadata without actually assigning windows. If we were more
clever we might have found a way for it to not be public... it is
something that can easily lead to an invalid pipeline.

Yes, that was what hit me about one minute after I started this
thread. :)


I think "compatible windows" today in Beam doesn't have very good
uses anyhow. I do see how when you are flattening PCollections
you might also want to explicitly have a function that says "and
here is how to reconcile their different metadata". But is it not
reasonable to use Window.into(global window)? It doesn't seem
like boilerplate to me actually, but something you really want to
know is happening.


:)

Of course this was the way out, but I was somewhat intuitively
seeking something that could go this autonomously.

Generally speaking, we might have some room for improvement in the
way we handle windows and triggers - windows relate only to GBK
and stateful ParDo, triggers relate to GBK only. They have no
semantics if downstream processing does not use any of these.
There could be a pipeline preprocessing stage that would discard
(replace with meaningful defaults) any of these metadata that is
unused, but can cause Pipeline to fail at construction time. It is
also (to me) somewhat questionable if triggers are really a
property of a PCollection or a property of a specific transform
(GBK - ehm, actually (stateless) 'key by' + 'reduce by key', but
that is completely different story :)) because (non-default)
triggers are likely not preserved across multiple transforms.
Maybe the correct subject of this thread could be "are we sure our
windowing and triggering semantics is 100% correct"? Probably the
- wrong - expectations at the beginning of this thread were due to
conflict in my mental model of how things 'could' work as opposed
to how they actually work. :)

 Jan



Kenn

On Tue, Apr 9, 2024 at 9:19 AM Jan Lukavský  wrote:

On 4/6/24 21:23, Reuven Lax via dev wrote:

So the problem here is that windowFn is a property of the
PCollection, not the element, and the result of Flatten is a
single PCollection.

Yes. That is the cause of why Flatten.pCollections() needs
the same windowFn.


In various cases, there is a notion of "compatible" windows.
Basically given window functions W1 and W2, provide a W3
that "works" with both.

Exactly this would be a nice feature for Flatten, something
like 'windowFn resolve strategy', so that if use does not
know the windowFn of upstream PCollections this can be
somehow resolved at pipeline construction time. Alternatively
only as a small syntactic sugar, something like:
 
Flatten.pCollections().withWindowingStrategy(WindowResolution.into(oneInput.getWindowingStrategy()))

or anything similar. This can be done in user code, so it is
not something deeper, but might help in some cases. It would
be cool if we could reuse concepts from other cases where
such mechanism is needed.



Note that Beam already has something similar with side
inputs, since the side input often is in a different window
than the main in

Re: PCollection#applyWindowingStrategyInternal

2024-04-10 Thread Jan Lukavský

On 4/9/24 18:33, Kenneth Knowles wrote:
At a top level `setWindowingStrategyInternal` exists to set up the 
metadata without actually assigning windows. If we were more clever we 
might have found a way for it to not be public... it is something that 
can easily lead to an invalid pipeline.

Yes, that was what hit me about one minute after I started this thread. :)


I think "compatible windows" today in Beam doesn't have very good uses 
anyhow. I do see how when you are flattening PCollections you might 
also want to explicitly have a function that says "and here is how to 
reconcile their different metadata". But is it not reasonable to use 
Window.into(global window)? It doesn't seem like boilerplate to me 
actually, but something you really want to know is happening.


:)

Of course this was the way out, but I was somewhat intuitively seeking 
something that could go this autonomously.


Generally speaking, we might have some room for improvement in the way 
we handle windows and triggers - windows relate only to GBK and stateful 
ParDo, triggers relate to GBK only. They have no semantics if downstream 
processing does not use any of these. There could be a pipeline 
preprocessing stage that would discard (replace with meaningful 
defaults) any of these metadata that is unused, but can cause Pipeline 
to fail at construction time. It is also (to me) somewhat questionable 
if triggers are really a property of a PCollection or a property of a 
specific transform (GBK - ehm, actually (stateless) 'key by' + 'reduce 
by key', but that is completely different story :)) because 
(non-default) triggers are likely not preserved across multiple 
transforms. Maybe the correct subject of this thread could be "are we 
sure our windowing and triggering semantics is 100% correct"? Probably 
the - wrong - expectations at the beginning of this thread were due to 
conflict in my mental model of how things 'could' work as opposed to how 
they actually work. :)


 Jan



Kenn

On Tue, Apr 9, 2024 at 9:19 AM Jan Lukavský  wrote:

On 4/6/24 21:23, Reuven Lax via dev wrote:

So the problem here is that windowFn is a property of the
PCollection, not the element, and the result of Flatten is a
single PCollection.

Yes. That is the cause of why Flatten.pCollections() needs the
same windowFn.


In various cases, there is a notion of "compatible" windows.
Basically given window functions W1 and W2, provide a W3 that
"works" with both.

Exactly this would be a nice feature for Flatten, something like
'windowFn resolve strategy', so that if use does not know the
windowFn of upstream PCollections this can be somehow resolved at
pipeline construction time. Alternatively only as a small
syntactic sugar, something like:
 
Flatten.pCollections().withWindowingStrategy(WindowResolution.into(oneInput.getWindowingStrategy()))

or anything similar. This can be done in user code, so it is not
something deeper, but might help in some cases. It would be cool
if we could reuse concepts from other cases where such mechanism
is needed.



Note that Beam already has something similar with side inputs,
since the side input often is in a different window than the main
input. However main input elements are supposed to see side input
elements in the same window (and in fact main inputs are blocked
until the side-input window is ready), so we must do a mapping.
If for example (and very commonly!) the side input is in the
global window and the main input is in a fixed window, by default
we will remap the global-window elements into the main-input's
fixed window.


This is a one-sided merge function, there is a 'main' and 'side'
input, but the generic symmetric merge might be possible as well.
E.g. if one PCollection of Flatten is in GlobalWindow, I wonder if
there are cases where users would actually want to do anything
else then apply the same global windowing strategy to all input
PCollections.

 Jan



In Side input we also allow the user to control this mapping, so
for example side input elements could always map to the previous
fixed window (e.g. while processing window 12-1, you want to see
summary data of all records in the previous window 11-12). Users
can do this by providing a WindowMappingFunction to the View -
essentially a function from window to window. Unfortunately this
is hard to use (one must create their own PCollectionView class)
and very poorly documented, so I doubt many users know about this!

    Reuven

On Sat, Apr 6, 2024 at 7:09 AM Jan Lukavský  wrote:

Immediate self-correction, although setting the strategy
directly via
setWindowingStrategyInternal() *seemed* to be working during
Pipeline
construction time, during runtime it obviously does not work,

Re: PCollection#applyWindowingStrategyInternal

2024-04-09 Thread Jan Lukavský

On 4/6/24 21:23, Reuven Lax via dev wrote:
So the problem here is that windowFn is a property of the PCollection, 
not the element, and the result of Flatten is a single PCollection.
Yes. That is the cause of why Flatten.pCollections() needs the same 
windowFn.


In various cases, there is a notion of "compatible" windows. Basically 
given window functions W1 and W2, provide a W3 that "works" with both.
Exactly this would be a nice feature for Flatten, something like 
'windowFn resolve strategy', so that if use does not know the windowFn 
of upstream PCollections this can be somehow resolved at pipeline 
construction time. Alternatively only as a small syntactic sugar, 
something like:

 
Flatten.pCollections().withWindowingStrategy(WindowResolution.into(oneInput.getWindowingStrategy()))

or anything similar. This can be done in user code, so it is not 
something deeper, but might help in some cases. It would be cool if we 
could reuse concepts from other cases where such mechanism is needed.




Note that Beam already has something similar with side inputs, since 
the side input often is in a different window than the main input. 
However main input elements are supposed to see side input elements in 
the same window (and in fact main inputs are blocked until the 
side-input window is ready), so we must do a mapping. If for example 
(and very commonly!) the side input is in the global window and the 
main input is in a fixed window, by default we will remap the 
global-window elements into the main-input's fixed window.


This is a one-sided merge function, there is a 'main' and 'side' input, 
but the generic symmetric merge might be possible as well. E.g. if one 
PCollection of Flatten is in GlobalWindow, I wonder if there are cases 
where users would actually want to do anything else then apply the same 
global windowing strategy to all input PCollections.


 Jan



In Side input we also allow the user to control this mapping, so for 
example side input elements could always map to the previous fixed 
window (e.g. while processing window 12-1, you want to see summary 
data of all records in the previous window 11-12). Users can do this 
by providing a WindowMappingFunction to the View - essentially a 
function from window to window. Unfortunately this is hard to use (one 
must create their own PCollectionView class) and very poorly 
documented, so I doubt many users know about this!


Reuven

On Sat, Apr 6, 2024 at 7:09 AM Jan Lukavský  wrote:

Immediate self-correction, although setting the strategy directly via
setWindowingStrategyInternal() *seemed* to be working during Pipeline
construction time, during runtime it obviously does not work, because
the PCollection was still windowed using the old windowFn. Make
sense to
me, but there remains the other question if we can make flattening
PCollections with incompatible windowFns more user-friendly. The
current
approach where we require the same windowFn for all input
PCollections
creates some unnecessary boilerplate code needed on user side.

  Jan

    On 4/6/24 15:45, Jan Lukavský wrote:
> Hi,
>
> I came across a case where using
> PCollection#applyWindowingStrategyInternal seems legit in user
core.
> The case is roughly as follows:
>
>  a) compute some streaming statistics
>
>  b) apply the same transform (say ComputeWindowedAggregation) with
> different parameters on these statistics yielding two windowed
> PCollections - first is global with early trigger, the other is
> sliding window, the specific parameters of the windowFns are
> encapsulated in the ComputeWindowedAggregation transform
>
>  c) apply the same transform on both of the above PCollections,
> yielding two PCollections with the same types, but different
windowFns
>
>  d) flatten these PCollections into single one (e.g. for downstream
> processing - joining - or flushing to sink)
>
> Now, the flatten will not work, because these PCollections have
> different windowFns. It would be possible to restore the
windowing for
> either of them, but it requires to somewhat break the
encapsulation of
> the transforms that produce the windowed outputs. A more natural
> solution is to take the WindowingStrategy from the global
aggregation
> and set it via setWindowingStrategyInternal() to the other
> PCollection. This works, but it uses API that is marked as
@Internal
> (and obviously, the name as well suggests it is not intended for
> client-code usage).
>
> The question is, should we make a legitimate version of this
call? Or
> should we introduce a way for Flatten.pCollections() to
re-window the
> input PCollections appropriately? In the case of conflicting
> 

Re: PCollection#applyWindowingStrategyInternal

2024-04-06 Thread Jan Lukavský
Immediate self-correction, although setting the strategy directly via 
setWindowingStrategyInternal() *seemed* to be working during Pipeline 
construction time, during runtime it obviously does not work, because 
the PCollection was still windowed using the old windowFn. Make sense to 
me, but there remains the other question if we can make flattening 
PCollections with incompatible windowFns more user-friendly. The current 
approach where we require the same windowFn for all input PCollections 
creates some unnecessary boilerplate code needed on user side.


 Jan

On 4/6/24 15:45, Jan Lukavský wrote:

Hi,

I came across a case where using 
PCollection#applyWindowingStrategyInternal seems legit in user core. 
The case is roughly as follows:


 a) compute some streaming statistics

 b) apply the same transform (say ComputeWindowedAggregation) with 
different parameters on these statistics yielding two windowed 
PCollections - first is global with early trigger, the other is 
sliding window, the specific parameters of the windowFns are 
encapsulated in the ComputeWindowedAggregation transform


 c) apply the same transform on both of the above PCollections, 
yielding two PCollections with the same types, but different windowFns


 d) flatten these PCollections into single one (e.g. for downstream 
processing - joining - or flushing to sink)


Now, the flatten will not work, because these PCollections have 
different windowFns. It would be possible to restore the windowing for 
either of them, but it requires to somewhat break the encapsulation of 
the transforms that produce the windowed outputs. A more natural 
solution is to take the WindowingStrategy from the global aggregation 
and set it via setWindowingStrategyInternal() to the other 
PCollection. This works, but it uses API that is marked as @Internal 
(and obviously, the name as well suggests it is not intended for 
client-code usage).


The question is, should we make a legitimate version of this call? Or 
should we introduce a way for Flatten.pCollections() to re-window the 
input PCollections appropriately? In the case of conflicting 
WindowFns, where one of them is GlobalWindowing strategy, it seems to 
me that the user's intention is quite well-defined (this might extend  
to some 'flatten windowFn resolution strategy', maybe).


WDYT?

 Jan



PCollection#applyWindowingStrategyInternal

2024-04-06 Thread Jan Lukavský

Hi,

I came across a case where using 
PCollection#applyWindowingStrategyInternal seems legit in user core. The 
case is roughly as follows:


 a) compute some streaming statistics

 b) apply the same transform (say ComputeWindowedAggregation) with 
different parameters on these statistics yielding two windowed 
PCollections - first is global with early trigger, the other is sliding 
window, the specific parameters of the windowFns are encapsulated in the 
ComputeWindowedAggregation transform


 c) apply the same transform on both of the above PCollections, 
yielding two PCollections with the same types, but different windowFns


 d) flatten these PCollections into single one (e.g. for downstream 
processing - joining - or flushing to sink)


Now, the flatten will not work, because these PCollections have 
different windowFns. It would be possible to restore the windowing for 
either of them, but it requires to somewhat break the encapsulation of 
the transforms that produce the windowed outputs. A more natural 
solution is to take the WindowingStrategy from the global aggregation 
and set it via setWindowingStrategyInternal() to the other PCollection. 
This works, but it uses API that is marked as @Internal (and obviously, 
the name as well suggests it is not intended for client-code usage).


The question is, should we make a legitimate version of this call? Or 
should we introduce a way for Flatten.pCollections() to re-window the 
input PCollections appropriately? In the case of conflicting WindowFns, 
where one of them is GlobalWindowing strategy, it seems to me that the 
user's intention is quite well-defined (this might extend  to some 
'flatten windowFn resolution strategy', maybe).


WDYT?

 Jan



Re: Patch release proposal

2024-03-28 Thread Jan Lukavský

+1 to either doing full release or deferring to 2.56.0.

 Jan

On 3/28/24 16:52, Yi Hu via dev wrote:
> Just releasing Python can break multi-lang by default (unless 
expansion service is overridden manually) since we match versions 
across languages when picking the default expansion service.


Yes, that's why I proposed "the source code of release candidate (e.g. 
apache_beam/version.py) still reads 2.55.0. " Anyways it seems doing a 
full release is preferred as it reduces the risk of breakages.


On Thu, Mar 28, 2024 at 11:38 AM Chamikara Jayalath via dev 
 wrote:




On Thu, Mar 28, 2024 at 8:36 AM Chamikara Jayalath
 wrote:

Just releasing Python can break multi-lang by default (unless
expansion service is overridden manually) since we match
versions across languages when picking the default expansion
service.


https://github.com/apache/beam/blob/2f8854a3e34f31c1cc034f95ad36f317abc906ff/sdks/python/apache_beam/utils/subprocess_server.py#L42


Correct link:

https://github.com/apache/beam/blob/2f8854a3e34f31c1cc034f95ad36f317abc906ff/sdks/python/apache_beam/utils/subprocess_server.py#L352



Thanks,
Cham

On Thu, Mar 28, 2024 at 8:26 AM Danny McCormick via dev
 wrote:

> The patch itself [1] is trivial, however, the release
process is not trivial. There is little documentation nor
practice for a patch release process. I could imagine two
options

I think there's not a ton of documentation because we
haven't done it, but all the release workflows were
authored in such a way that they should "just work",
outside of cutting the release branch itself. So the
workflow should be almost identical to the existing one,
but with several steps skipped (cherry picks, beam
website, most validation). Notably, this shouldn't be any
easier/harder if we're doing it for one language or all 3.

I can take that on if needed.

> Besides, there should be a Beam YAML validation workflow
and added in

https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=1368030253

> If we do a patch release for Python SDK, let's also
patch another known issue for which fix is available:
https://github.com/apache/beam/blob/master/CHANGES.md#known-issues-1

+1 to both of these

On Thu, Mar 28, 2024 at 11:25 AM Yi Hu via dev
 wrote:

Thanks Valentyn for raising this. In this case, Python
containers will also be included. Different from PyPI
wheels, docker tag can override so it can stay with 2.55.0

On Thu, Mar 28, 2024 at 11:15 AM Valentyn Tymofieiev
 wrote:

If we do a patch release for Python SDK, let's
also patch another known issue for which fix is
available:

https://github.com/apache/beam/blob/master/CHANGES.md#known-issues-1

On Thu, Mar 28, 2024 at 8:01 AM Yi Hu via dev
 wrote:

2.55.0 release manager here

The patch itself [1] is trivial, however, the
release process is not trivial. There is
little documentation nor practice for a patch
release process. I could imagine two options

1. Do a full "2.55.1" release

2. Do a patch release only for Python SDK, that is
  a. cherry-pick [1] into release-2.55.0 branch
  b. tag a 2.55.1rc1 release candidate - note
that the source code of release candidate
(e.g. apache_beam/version.py) still reads
2.55.0. This ensures Python SDK picks up the
Java expansion service / job server of
existing version (2.55.0). We did it once for
Go SDK
(https://github.com/apache/beam/tree/sdks/v2.48.2)
  c. Build the release candidate for Python
wheels (also Python containers? Not sure if it
is needed)
  d. send out the RC for validation
  e. finalize the release

If we decided to do a patch release I would
prefer option 2. I can take on that if decided
to do. However, if we decide do a full release
(or both Java and Python) I would suggest
defer to next release cycle, as the release
process 

Re: [VOTE] Release 2.55.0, release candidate #3

2024-03-21 Thread Jan Lukavský

+1 (binding)

Tested Java SDK with FlinkRunner.

 Jan

On 3/20/24 22:40, Chamikara Jayalath via dev wrote:

+1 (binding)

Tested multi-lang Java/Python pipelines and upgrading BQ/Kafka 
transforms from 2.53.0 to 2.55.0 using the Transform Service.


Thanks,
Cham

On Tue, Mar 19, 2024 at 2:10 PM XQ Hu via dev  wrote:

+1 (non-binding). Ran the simple ML pipeline without any issue:
https://github.com/google/dataflow-ml-starter/actions/runs/8349158153

On Tue, Mar 19, 2024 at 11:55 AM Ritesh Ghorse via dev
 wrote:

+1 (non-binding) - Ran a few python batch examples on Direct
and Dataflow runner.

Thanks!

On Tue, Mar 19, 2024 at 10:56 AM Yi Hu via dev
 wrote:

Hi everyone,
Please review and vote on the release candidate #3 for the
version 2.55.0, as follows:

[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide
specific comments)


Reviewers are encouraged to test their own use cases with
the release candidate, and vote +1 if
no issues are found. Only PMC member votes will count
towards the final vote, but votes from all
community members is encouraged and helpful for finding
regressions; you can either test your own
use cases [13] or use cases from the validation sheet [10].

The complete staging area is available for your review,
which includes:
* GitHub Release notes [1],
* the official Apache source release to be deployed to
dist.apache.org  [2], which is
signed with the key with fingerprint D20316F712213422 [3],
* all artifacts to be deployed to the Maven Central
Repository [4],
* source code tag "v2.55.0-RC3" [5],
* website pull request listing the release [6], the blog
post [6], and publishing the API reference manual [7].
* Python artifacts are deployed along with the source
release to the dist.apache.org 
[2] and PyPI [8].
* Go artifacts and documentation are available at
pkg.go.dev  [9]
* Validation sheet with a tab for 2.55.0 release to help
with validation [10].
* Docker images published to Docker Hub [11].
* PR to run tests against release branch [12].

The vote will be open for at least 72 hours. It is adopted
by majority approval, with at least 3 PMC affirmative votes.

For guidelines on how to try the release in your projects,
check out our RC testing guide [13].

Thanks,
Release Manager

[1] https://github.com/apache/beam/milestone/19
[2] https://dist.apache.org/repos/dist/dev/beam/2.55.0/
[3] https://dist.apache.org/repos/dist/release/beam/KEYS
[4]

https://repository.apache.org/content/repositories/orgapachebeam-1373/
[5] https://github.com/apache/beam/tree/v2.55.0-RC3
[6] https://github.com/apache/beam/pull/30607
[7] https://github.com/apache/beam-site/pull/661
[8] https://pypi.org/project/apache-beam/2.55.0rc3/
[9]

https://pkg.go.dev/github.com/apache/beam/sdks/v2@v2.55.0-RC3/go/pkg/beam
[10]

https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=1368030253
[11]
https://hub.docker.com/search?q=apache%2Fbeam=image

[12] https://github.com/apache/beam/pull/30569
[13]

https://github.com/apache/beam/blob/master/contributor-docs/rc-testing-guide.md


--

Yi Hu, (he/him/his)

Software Engineer



Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-29 Thread Jan Lukavský
Going a little further, instead of CombneFn in (b), we might try to 
solve the problem of incorporating iterations into the model. Iterations 
(backloops) working without event-timers (i.e. processing time tmers 
only or no timers at all) should not interfere with watermarks and 
therefore would not create the problem of "vector watermarks". The 
Throttle transform would then use the backling for feedback loop to 
slowdown the request rate.


On 2/29/24 14:57, Jan Lukavský wrote:
From my understanding Flink rate limits based on local information 
only. On the other hand - in case of Flink - this should easily extend 
to global information, because the parallelism for both batch and 
streaming is set before job is launched and remains unchanged (until 
possible manual rescaling). There is a possibility of adaptive 
scheduling [1] which would then probably require communication of the 
parallelism to workers (I'd guess this is not implemented).


Regarding the other points - I'd be in favor of the following:

 a) batch timers - trying to extend the current definition of 
processing time timers to batch without introduction new primitive, so 
in an extended, backwards compatible way (presumably mostly 
terminating condition?)


 b) we could define a CombineFn that would accumulate data from 
workers and provide accumulated results in defined tumbling windows 
back to workers - this could be reused both Throttle, watermark 
alignment, and probably others


Best,

 Jan

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/



On 2/28/24 19:37, Robert Burke wrote:
Sounds like a different variation is either new timer types with 
those distinctions in mind, or additional configuration for 
ProcessingTime timers (defaulting to current behavior) to sort out 
those cases. Could potentially be extended to EventTime timers too 
for explicitly handling looping timer cases (eg. To signal: This 
DoFn's OnWindowExpiry method manages the consequences of this timer's 
effect of a Drain. Or similar. Or we put that as a additional 
configuration for OnWindowExpiry, along with Drain Awareness...)


I got curious and looked loosely at how Flink solves this problem: 
https://flink.apache.org/2022/11/25/optimising-the-throughput-of-async-sinks-using-a-custom-ratelimitingstrategy/


In short, an explicit rate limiting strategy. The surface glance 
indicates that it relies on local in memory state, but actual use of 
these things seems relegated to abstract classes (eg for Sinks and 
similar). It's not clear to me whether there is cross worker 
coordination happening there, or it's assumed to be all on a single 
machine anyway. I'm unfamiliar with how Flink operates, so I can't say.


I think I'd be happiest if we could build into Beam a mechanism / 
paired primitive where such a Cross Worker Communication Pair (the 
processor/server + DoFn client) could be built, but not purely be 
limited to Rate limiting/Throttling. Possibly mumble mumble 
StatePipe? But that feels like a harder problem for the time being.


Robert Burke

On 2024/02/28 08:25:35 Jan Lukavský wrote:

On 2/27/24 19:49, Robert Bradshaw via dev wrote:

On Tue, Feb 27, 2024 at 10:39 AM Jan Lukavský  wrote:

On 2/27/24 19:22, Robert Bradshaw via dev wrote:
On Mon, Feb 26, 2024 at 11:45 AM Kenneth Knowles 
 wrote:

Pulling out focus points:

On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev 
 wrote:
I can't act on something yet [...] but I expect to be able to 
[...] at some time in the processing-time future.
I like this as a clear and internally-consistent feature 
description. It describes ProcessContinuation and those timers 
which serve the same purpose as ProcessContinuation.


On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev 
 wrote:
I can't think of a batch or streaming scenario where it would 
be correct to not wait at least that long
The main reason we created timers: to take action in the absence 
of data. The archetypal use case for processing time timers 
was/is "flush data from state if it has been sitting there too 
long". For this use case, the right behavior for batch is to 
skip the timer. It is actually basically incorrect to wait.
Good point calling out the distinction between "I need to wait in 
case

there's more data." and "I need to wait for something external." We
can't currently distinguish between the two, but a batch runner can
say something definitive about the first. Feels like we need a new
primitive (or at least new signaling information on our existing
primitive).
Runners signal end of data to a DoFn via (input) watermark. Is 
there a

need for additional information?

Yes, and I agree that watermarks/event timestamps are a much better
way to track data completeness (if possible).

Unfortunately processing timers don't specify if they're waiting for
additional data or external/environmental change, meaning we can't use
the (event time) wat

Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-29 Thread Jan Lukavský
From my understanding Flink rate limits based on local information 
only. On the other hand - in case of Flink - this should easily extend 
to global information, because the parallelism for both batch and 
streaming is set before job is launched and remains unchanged (until 
possible manual rescaling). There is a possibility of adaptive 
scheduling [1] which would then probably require communication of the 
parallelism to workers (I'd guess this is not implemented).


Regarding the other points - I'd be in favor of the following:

 a) batch timers - trying to extend the current definition of 
processing time timers to batch without introduction new primitive, so 
in an extended, backwards compatible way (presumably mostly terminating 
condition?)


 b) we could define a CombineFn that would accumulate data from workers 
and provide accumulated results in defined tumbling windows back to 
workers - this could be reused both Throttle, watermark alignment, and 
probably others


Best,

 Jan

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/



On 2/28/24 19:37, Robert Burke wrote:

Sounds like a different variation is either new timer types with those 
distinctions in mind, or additional configuration for ProcessingTime timers 
(defaulting to current behavior) to sort out those cases. Could potentially be 
extended to EventTime timers too for explicitly handling looping timer cases 
(eg. To signal: This DoFn's OnWindowExpiry method manages the consequences of 
this timer's effect of a Drain. Or similar. Or we put that as a additional 
configuration for OnWindowExpiry, along with Drain Awareness...)

I got curious and looked loosely at how Flink solves this problem:  
https://flink.apache.org/2022/11/25/optimising-the-throughput-of-async-sinks-using-a-custom-ratelimitingstrategy/

In short, an explicit rate limiting strategy. The surface glance indicates that 
it relies on local in memory state, but actual use of these things seems 
relegated to abstract classes (eg for Sinks and similar). It's not clear to me 
whether there is cross worker coordination happening there, or it's assumed to 
be all on a single machine anyway. I'm unfamiliar with how Flink operates, so I 
can't say.

I think I'd be happiest if we could build into Beam a mechanism / paired 
primitive where such a Cross Worker Communication Pair (the processor/server + 
DoFn client) could be built, but not purely be limited to Rate 
limiting/Throttling. Possibly mumble mumble StatePipe? But that feels like a 
harder problem for the time being.

Robert Burke

On 2024/02/28 08:25:35 Jan Lukavský wrote:

On 2/27/24 19:49, Robert Bradshaw via dev wrote:

On Tue, Feb 27, 2024 at 10:39 AM Jan Lukavský  wrote:

On 2/27/24 19:22, Robert Bradshaw via dev wrote:

On Mon, Feb 26, 2024 at 11:45 AM Kenneth Knowles  wrote:

Pulling out focus points:

On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev  
wrote:

I can't act on something yet [...] but I expect to be able to [...] at some 
time in the processing-time future.

I like this as a clear and internally-consistent feature description. It 
describes ProcessContinuation and those timers which serve the same purpose as 
ProcessContinuation.

On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev  
wrote:

I can't think of a batch or streaming scenario where it would be correct to not 
wait at least that long

The main reason we created timers: to take action in the absence of data. The archetypal 
use case for processing time timers was/is "flush data from state if it has been 
sitting there too long". For this use case, the right behavior for batch is to skip 
the timer. It is actually basically incorrect to wait.

Good point calling out the distinction between "I need to wait in case
there's more data." and "I need to wait for something external." We
can't currently distinguish between the two, but a batch runner can
say something definitive about the first. Feels like we need a new
primitive (or at least new signaling information on our existing
primitive).

Runners signal end of data to a DoFn via (input) watermark. Is there a
need for additional information?

Yes, and I agree that watermarks/event timestamps are a much better
way to track data completeness (if possible).

Unfortunately processing timers don't specify if they're waiting for
additional data or external/environmental change, meaning we can't use
the (event time) watermark to determine whether they're safe to
trigger.

+1



Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-28 Thread Jan Lukavský



On 2/27/24 19:49, Robert Bradshaw via dev wrote:

On Tue, Feb 27, 2024 at 10:39 AM Jan Lukavský  wrote:

On 2/27/24 19:22, Robert Bradshaw via dev wrote:

On Mon, Feb 26, 2024 at 11:45 AM Kenneth Knowles  wrote:

Pulling out focus points:

On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev  
wrote:

I can't act on something yet [...] but I expect to be able to [...] at some 
time in the processing-time future.

I like this as a clear and internally-consistent feature description. It 
describes ProcessContinuation and those timers which serve the same purpose as 
ProcessContinuation.

On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev  
wrote:

I can't think of a batch or streaming scenario where it would be correct to not 
wait at least that long

The main reason we created timers: to take action in the absence of data. The archetypal 
use case for processing time timers was/is "flush data from state if it has been 
sitting there too long". For this use case, the right behavior for batch is to skip 
the timer. It is actually basically incorrect to wait.

Good point calling out the distinction between "I need to wait in case
there's more data." and "I need to wait for something external." We
can't currently distinguish between the two, but a batch runner can
say something definitive about the first. Feels like we need a new
primitive (or at least new signaling information on our existing
primitive).

Runners signal end of data to a DoFn via (input) watermark. Is there a
need for additional information?

Yes, and I agree that watermarks/event timestamps are a much better
way to track data completeness (if possible).

Unfortunately processing timers don't specify if they're waiting for
additional data or external/environmental change, meaning we can't use
the (event time) watermark to determine whether they're safe to
trigger.

+1


Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-27 Thread Jan Lukavský

On 2/27/24 19:30, Robert Bradshaw via dev wrote:

On Tue, Feb 27, 2024 at 7:44 AM Robert Burke  wrote:

An "as fast as it can runner" with dynamic splits, would ultimately split to the systems 
maximum available parallelism (for stateful DoFns, this is the number of keys; for SplittableDoFns, 
this is the maximum sharding of each input element's restriction. That's what would happen with a 
"normal" sleep.

WRT Portability, this means adding a current ProcessingTime field to the 
ProcessBundleRequest, and likely also to the ProgressRequest so the runner could 
coordinate. ProgressResponse may then need a "asleepUntil" field to communicate 
back the state of the bundle, which the runner could then use to better time its next 
ProgressRequest, and potentially arrest dynamic splitting for that bundle. After all, the 
sleeping bundle is blocked until processing time has advanced anyway; no progress can be 
made.

I like moving the abstraction out of the timer space, as it better aligns with 
user intent for the throttle case, and it doesn't require a Stateful DoFn to 
operate (orthogonal!), meaning it's useful for It also solves the testing issue 
WRT ProcessingTime timers using an absolute time, rather than a relative time, 
as the SDK can rebuild it's relative setters for output time on the new 
canonical processing time, without user code changing.

The sleeping inprogress bundle naturally holds back the watermark too.

I suspect this mechanism would end up tending to over throttle as Reuven 
described earlier, since the user is only pushing back on immediate processing 
for the current element, not necessarily all elements. This is particularly 
likely if there's a long gap between ProgressRequests for the bundle and the 
runner doesn't adapt it's cadence.

An external source of rate doesn't really exist, other than some external 
source that can provide throttle information. There would remain time skew 
between the runner system and the external system though, but for a throttle 
that's likely fine.

A central notion of ProcessingTime also allows the runner to "smear" processing 
time so if there's a particularly long delay, it doesn't need to catch up at once. I 
don't think that's relevant for the throttle case though, since with the described clock 
mechanism and the communication back to the runner, the unblocking notion is probably 
fine.

On this note, I have become skeptical that a global throttling rate
can be done well with local information.

For streaming dataflow, we can have an approximate solution by knowing
the number of keys and doing per-key throttling because keys (at least
up to hundreds per worker) are all processed concurrently. This
solution doesn't even require state + timers and would best be done by
standard sleeps.

For most other systems, including dataflow batch, this would massively
under throttle. Here we need to either add something to the model, or
do something outside the model, to discover, dynamically, how many
siblings are being concurrently run. (This could be done at a
worker/process level, rather than bundle level, as well.) The ability
to broadcast, aggregate, and read dynamic, provisional from all
workers could help in other cases too (e.g. a more efficient top N),
but this is a whole new thread...

So while I think the semantics of processing timers in batch is worth
solving, this probably isn't the best application.
Yes, it seems that under the assumption of dynamic parallelism defined 
by runner defining global throttling rate is not possible under the 
current model. But maybe (rather than introducing a whole new concept) 
we could propagate the informatoin about current parallelism from runner 
to DoFn via ProcessContext? For some runners that would be as easy as 
returning a constant. Dynamic runners would be more involved, but the 
only other option than propagaring parallelism from runner to workers 
seems to be introduction of a whole new worker <-> runner communication 
channel, so that worker could ask runner for a permission to proceed 
with processing data based on some (global) condition. It feels somewhat 
too complex given the motivating example. Maybe there could be others so 
that this could be generalized to a concept, what comes to mind is 
something Flink calls "watermark alignment", which throttles sources 
based on the event-time progress of individual partitions, so that 
partitions that are too ahead of time do not blow up downstream state. 
These might be related concepts.



We'd need a discussion of what an SDK must do if the runner doesn't support the 
central clock for completeness, and consistency.


On Tue, Feb 27, 2024, 6:58 AM Jan Lukavský  wrote:

On 2/27/24 14:51, Kenneth Knowles wrote:

I very much like the idea of processing time clock as a parameter to 
@ProcessElement. That will be obviously useful and remove a source of 
inconsistency, in addition to letting 

Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-27 Thread Jan Lukavský

On 2/27/24 19:22, Robert Bradshaw via dev wrote:

On Mon, Feb 26, 2024 at 11:45 AM Kenneth Knowles  wrote:

Pulling out focus points:

On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev  
wrote:

I can't act on something yet [...] but I expect to be able to [...] at some 
time in the processing-time future.

I like this as a clear and internally-consistent feature description. It 
describes ProcessContinuation and those timers which serve the same purpose as 
ProcessContinuation.

On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev  
wrote:

I can't think of a batch or streaming scenario where it would be correct to not 
wait at least that long

The main reason we created timers: to take action in the absence of data. The archetypal 
use case for processing time timers was/is "flush data from state if it has been 
sitting there too long". For this use case, the right behavior for batch is to skip 
the timer. It is actually basically incorrect to wait.

Good point calling out the distinction between "I need to wait in case
there's more data." and "I need to wait for something external." We
can't currently distinguish between the two, but a batch runner can
say something definitive about the first. Feels like we need a new
primitive (or at least new signaling information on our existing
primitive).
Runners signal end of data to a DoFn via (input) watermark. Is there a 
need for additional information?



On Fri, Feb 23, 2024 at 3:54 PM Robert Burke  wrote:

It doesn't require a new primitive.

IMO what's being proposed *is* a new primitive. I think it is a good primitive. 
It is the underlying primitive to ProcessContinuation. It would be 
user-friendly as a kind of timer. But if we made this the behavior of 
processing time timers retroactively, it would break everyone using them to 
flush data who is also reprocessing data.

There's two very different use cases ("I need to wait, and block data" vs "I want to 
act without data, aka NOT wait for data") and I think we should serve both of them, but it 
doesn't have to be with the same low-level feature.

Kenn


On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev  
wrote:

On Fri, Feb 23, 2024 at 3:54 PM Robert Burke  wrote:

While I'm currently on the other side of the fence, I would not be against changing/requiring the semantics of 
ProcessingTime constructs to be "must wait and execute" as such a solution, and enables the Proposed 
"batch" process continuation throttling mechanism to work as hypothesized for both "batch" and 
"streaming" execution.

There's a lot to like, as it leans Beam further into the unification of Batch 
and Stream, with one fewer exception (eg. unifies timer experience further). It 
doesn't require a new primitive. It probably matches more with user 
expectations anyway.

It does cause looping timer execution with processing time to be a problem for 
Drains however.

I think we have a problem with looping timers plus drain (a mostly
streaming idea anyway) regardless.


I'd argue though that in the case of a drain, we could updated the semantics as "move 
watermark to infinity"  "existing timers are executed, but new timers are ignored",

I don't like the idea of dropping timers for drain. I think correct
handling here requires user visibility into whether a pipeline is
draining or not.


and ensure/and update the requirements around OnWindowExpiration callbacks to be a bit 
more insistent on being implemented for correct execution, which is currently the only 
"hard" signal to the SDK side that the window's work is guaranteed to be over, 
and remaining state needs to be addressed by the transform or be garbage collected. This 
remains critical for developing a good pattern for ProcessingTime timers within a Global 
Window too.

+1


On 2024/02/23 19:48:22 Robert Bradshaw via dev wrote:

Thanks for bringing this up.

My position is that both batch and streaming should wait for
processing time timers, according to local time (with the exception of
tests that can accelerate this via faked clocks).

Both ProcessContinuations delays and ProcessingTimeTimers are IMHO
isomorphic, and can be implemented in terms of each other (at least in
one direction, and likely the other). Both are an indication that I
can't act on something yet due to external constraints (e.g. not all
the data has been published, or I lack sufficient capacity/quota to
push things downstream) but I expect to be able to (or at least would
like to check again) at some time in the processing-time future. I
can't think of a batch or streaming scenario where it would be correct
to not wait at least that long (even in batch inputs, e.g. suppose I'm
tailing logs and was eagerly started before they were fully written,
or waiting for some kind of (non-data-dependent) quiessence or other
operation to finish).


On Fri, Feb 23, 2024 at 12:36 AM Ja

Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-27 Thread Jan Lukavský

On 2/27/24 16:36, Robert Burke wrote:
An "as fast as it can runner" with dynamic splits, would ultimately 
split to the systems maximum available parallelism (for stateful 
DoFns, this is the number of keys; for SplittableDoFns, this is the 
maximum sharding of each input element's restriction. That's what 
would happen with a "normal" sleep.
I see. It is definitely possible for a runner to split all processing to 
maximum parallelism, but - provided this cannot be controlled by user - 
can the semantics of the Throttle transform be even consistently defined 
in terms of processing time? Seems it would require a coordination with 
the runner so that user-code would at least be aware of current 
parallelism. The situation is easier for runners that set parallelism 
upfront.


WRT Portability, this means adding a current ProcessingTime field to 
the ProcessBundleRequest, and likely also to the ProgressRequest so 
the runner could coordinate. ProgressResponse may then need a 
"asleepUntil" field to communicate back the state of the bundle, which 
the runner could then use to better time its next ProgressRequest, and 
potentially arrest dynamic splitting for that bundle. After all, the 
sleeping bundle is blocked until processing time has advanced anyway; 
no progress can be made.


I like moving the abstraction out of the timer space, as it better 
aligns with user intent for the throttle case, and it doesn't require 
a Stateful DoFn to operate (orthogonal!), meaning it's useful for It 
also solves the testing issue WRT ProcessingTime timers using an 
absolute time, rather than a relative time, as the SDK can rebuild 
it's relative setters for output time on the new canonical processing 
time, without user code changing.
With what was said above - is the definition of sleep (pause) valid in 
the context of a bundle? By the same logic of splitting keys, "enough 
fast and efficient runner" could delay only the paused bundle and start 
processing different bundle (via different DoFn). It might require 
splitting bundles by keys, but should be possible. Seems that would in 
the end make the feature useless as well.


The sleeping inprogress bundle naturally holds back the watermark too.

I suspect this mechanism would end up tending to over throttle as 
Reuven described earlier, since the user is only pushing back on 
immediate processing for the current element, not necessarily all 
elements. This is particularly likely if there's a long gap between 
ProgressRequests for the bundle and the runner doesn't adapt it's cadence.


An external source of rate doesn't really exist, other than some 
external source that can provide throttle information. There would 
remain time skew between the runner system and the external system 
though, but for a throttle that's likely fine.


A central notion of ProcessingTime also allows the runner to "smear" 
processing time so if there's a particularly long delay, it doesn't 
need to catch up at once. I don't think that's relevant for the 
throttle case though, since with the described clock mechanism and the 
communication back to the runner, the unblocking notion is probably fine.


We'd need a discussion of what an SDK must do if the runner doesn't 
support the central clock for completeness, and consistency.



On Tue, Feb 27, 2024, 6:58 AM Jan Lukavský  wrote:

On 2/27/24 14:51, Kenneth Knowles wrote:

I very much like the idea of processing time clock as a parameter
to @ProcessElement. That will be obviously useful and remove a
source of inconsistency, in addition to letting the runner/SDK
harness control it. I also like the idea of passing a Sleeper or
to @ProcessElement. These are both good practices for testing and
flexibility and runner/SDK language differences.

In your (a) (b) (c) can you be more specific about which
watermarks you are referring to? Are they the same as in my
opening email? If so, then what you describe is what we already have.

Yes, we have that for streaming, but it does not work this way in
batch. In my understanding we violate (a), we ignore (b) because
we fire timers at GC time only and (c) is currently relevant only
immediately preceding window GC time, but can be defined more
generally. But essentially yes, I was just trying to restate the
streaming processing time semantics in the limited batch case.


Kenn

On Tue, Feb 27, 2024 at 2:40 AM Jan Lukavský  wrote:

I think that before we introduce a possibly somewhat
duplicate new feature we should be certain that it is really
semantically different. I'll rephrase the two cases:

 a) need to wait and block data (delay) - the use case is the
motivating example of Throttle transform

 b) act without data, not block

Provided we align processing time with local machine clock
(or better, because of testing, make cur

Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-27 Thread Jan Lukavský

On 2/27/24 14:51, Kenneth Knowles wrote:
I very much like the idea of processing time clock as a parameter 
to @ProcessElement. That will be obviously useful and remove a source 
of inconsistency, in addition to letting the runner/SDK harness 
control it. I also like the idea of passing a Sleeper or 
to @ProcessElement. These are both good practices for testing and 
flexibility and runner/SDK language differences.


In your (a) (b) (c) can you be more specific about which watermarks 
you are referring to? Are they the same as in my opening email? If so, 
then what you describe is what we already have.
Yes, we have that for streaming, but it does not work this way in batch. 
In my understanding we violate (a), we ignore (b) because we fire timers 
at GC time only and (c) is currently relevant only immediately preceding 
window GC time, but can be defined more generally. But essentially yes, 
I was just trying to restate the streaming processing time semantics in 
the limited batch case.


Kenn

On Tue, Feb 27, 2024 at 2:40 AM Jan Lukavský  wrote:

I think that before we introduce a possibly somewhat duplicate new
feature we should be certain that it is really semantically
different. I'll rephrase the two cases:

 a) need to wait and block data (delay) - the use case is the
motivating example of Throttle transform

 b) act without data, not block

Provided we align processing time with local machine clock (or
better, because of testing, make current processing time available
via context to @ProcessElement) it seems to possble to unify both
cases under slightly updated semantics of processing time timer in
batch:

 a) processing time timers fire with best-effort, i.e. trying to
minimize delay between firing timestamp and timer's timestamp
 b) timer is valid only in the context of current key-window, once
watermark passes window GC time for the particular window that
created the timer, it is ignored
 c) if timer has output timestamp, this timestamp holds watermark
(but this is currently probably noop, because runners currently do
no propagate (per-key) watermark in batch, I assume)

In case b) there might be needed to distinguish cases when timer
has output timestamp, if so, it probably should be taken into account.

Now, such semantics should be quite aligned with what we do in
streaming case and what users generally expect. The blocking part
can be implemented in @ProcessElement using buffer & timer, once
there is need to wait, it can be implemented in user code using
plain sleep(). That is due to the alignment between local time and
definition of processing time. If we had some reason to be able to
run faster-than-wall-clock (as I'm still not in favor of that), we
could do that using ProcessContext.sleep(). Delaying processing in
the @ProcessElement should result in backpressuring and
backpropagation of this backpressure from the Throttle transform
to the sources as mentioned (of course this is only for the
streaming case).

Is there anything missing in such definition that would still
require splitting the timers into two distinct features?

 Jan

On 2/26/24 21:22, Kenneth Knowles wrote:

Yea I like DelayTimer, or SleepTimer, or WaitTimer or some such.

OutputTime is always an event time timestamp so it isn't even
allowed to be set outside the window (or you'd end up with an
element assigned to a window that it isn't within, since
OutputTime essentially represents reserving the right to output
an element with that timestamp)

Kenn

On Mon, Feb 26, 2024 at 3:19 PM Robert Burke 
wrote:

Agreed that a retroactive behavior change would be bad, even
if tied to a beam version change. I agree that it meshes well
with the general theme of State & Timers exposing underlying
primitives for implementing Windowing and similar. I'd say
the distinction between the two might be additional
complexity for users to grok, and would need to be documented
well, as both operate in the ProcessingTime domain, but
differently.

What to call this new timer then? DelayTimer?

"A DelayTimer sets an instant in ProcessingTime at which
point computations can continue. Runners will prevent the
EventTimer watermark from advancing past the set OutputTime
until Processing Time has advanced to at least the provided
instant to execute the timers callback. This can be used to
allow the runner to constrain pipeline throughput with user
guidance."

I'd probably add that a timer with an output time outside of
the window would not be guaranteed to fire, and that
OnWindowExpiry is the correct way to ensure cleanup occurs.

No solution to the Looping Timers on Drain problem here, but

Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-26 Thread Jan Lukavský
 to skip the timer. It is actually basically incorrect
to wait.

On Fri, Feb 23, 2024 at 3:54 PM Robert Burke
 wrote:
> It doesn't require a new primitive.

IMO what's being proposed *is* a new primitive. I think it is
a good primitive. It is the underlying primitive to
ProcessContinuation. It would be user-friendly as a kind of
timer. But if we made this the behavior of processing time
timers retroactively, it would break everyone using them to
flush data who is also reprocessing data.

There's two very different use cases ("I need to wait, and
block data" vs "I want to act without data, aka NOT wait for
data") and I think we should serve both of them, but it
doesn't have to be with the same low-level feature.

Kenn


On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev
 wrote:

On Fri, Feb 23, 2024 at 3:54 PM Robert Burke
 wrote:
>
> While I'm currently on the other side of the fence, I
would not be against changing/requiring the semantics of
ProcessingTime constructs to be "must wait and execute" as
such a solution, and enables the Proposed "batch" process
continuation throttling mechanism to work as hypothesized
for both "batch" and "streaming" execution.
>
> There's a lot to like, as it leans Beam further into the
unification of Batch and Stream, with one fewer exception
(eg. unifies timer experience further). It doesn't require
a new primitive. It probably matches more with user
expectations anyway.
>
> It does cause looping timer execution with processing
time to be a problem for Drains however.

I think we have a problem with looping timers plus drain
(a mostly
streaming idea anyway) regardless.

> I'd argue though that in the case of a drain, we could
updated the semantics as "move watermark to infinity" 
"existing timers are executed, but new timers are ignored",

I don't like the idea of dropping timers for drain. I
think correct
handling here requires user visibility into whether a
pipeline is
draining or not.

> and ensure/and update the requirements around
OnWindowExpiration callbacks to be a bit more insistent on
being implemented for correct execution, which is
currently the only "hard" signal to the SDK side that the
window's work is guaranteed to be over, and remaining
state needs to be addressed by the transform or be garbage
collected. This remains critical for developing a good
pattern for ProcessingTime timers within a Global Window too.

+1

>
> On 2024/02/23 19:48:22 Robert Bradshaw via dev wrote:
> > Thanks for bringing this up.
> >
> > My position is that both batch and streaming should
wait for
> > processing time timers, according to local time (with
the exception of
> > tests that can accelerate this via faked clocks).
> >
> > Both ProcessContinuations delays and
ProcessingTimeTimers are IMHO
> > isomorphic, and can be implemented in terms of each
other (at least in
> > one direction, and likely the other). Both are an
indication that I
> > can't act on something yet due to external constraints
(e.g. not all
> > the data has been published, or I lack sufficient
capacity/quota to
> > push things downstream) but I expect to be able to (or
at least would
> > like to check again) at some time in the
processing-time future. I
> > can't think of a batch or streaming scenario where it
would be correct
> > to not wait at least that long (even in batch inputs,
e.g. suppose I'm
> > tailing logs and was eagerly started before they were
    fully written,
> > or waiting for some kind of (non-data-dependent)
quiessence or other
> > operation to finish).
> >
> >
> > On Fri, Feb 23, 2024 at 12:36 AM Jan Lukavský
 wrote:
> > >
> > > For me it always helps to seek analogy in our
physical r

Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-23 Thread Jan Lukavský
For me it always helps to seek analogy in our physical reality. Stream 
processing actually has quite a good analogy for both event-time and 
processing-time - the simplest model for this being relativity theory. 
Event-time is the time at which events occur _at distant locations_. Due 
to finite and invariant speed of light (which is actually really 
involved in the explanation why any stream processing is inevitably 
unordered) these events are observed (processed) at different times 
(processing time, different for different observers). It is perfectly 
possible for an observer to observe events at a rate that is higher than 
one second per second. This also happens in reality for observers that 
travel at relativistic speeds (which might be an analogy for fast - 
batch - (re)processing). Besides the invariant speed, there is also 
another invariant - local clock (wall time) always ticks exactly at the 
rate of one second per second, no matter what. It is not possible to 
"move faster or slower" through (local) time.


In my understanding the reason why we do not put any guarantees or 
bounds on the delay of firing processing time timers is purely technical 
- the processing is (per key) single-threaded, thus any timer has to 
wait before any element processing finishes. This is only consequence of 
a technical solution, not something fundamental.


Having said that, my point is that according to the above analogy, it 
should be perfectly fine to fire processing time timers in batch based 
on (local wall) time only. There should be no way of manipulating this 
local time (excluding tests). Watermarks should be affected the same way 
as any buffering in a state that would happen in a stateful DoFn (i.e. 
set timer holds output watermark). We should probably pay attention to 
looping timers, but it seems possible to define a valid stopping 
condition (input watermark at infinity).


 Jan

On 2/22/24 19:50, Kenneth Knowles wrote:

Forking this thread.

The state of processing time timers in this mode of processing is not 
satisfactory and is discussed a lot but we should make everything 
explicit.


Currently, a state and timer DoFn has a number of logical watermarks: 
(apologies for fixed width not coming through in email lists). Treat 
timers as a back edge.


input --(A)(C)--> ParDo(DoFn) (D)---> output
            ^                      |
|--(B)-|
                           timers

(A) Input Element watermark: this is the watermark that promises there 
is no incoming element with a timestamp earlier than it. Each input 
element's timestamp holds this watermark. Note that *event time timers 
firing is according to this watermark*. But a runner commits changes 
to this watermark *whenever it wants*, in a way that can be 
consistent. So the runner can absolute process *all* the elements 
before advancing the watermark (A), and only afterwards start firing 
timers.


(B) Timer watermark: this is a watermark that promises no timer is set 
with an output timestamp earlier than it. Each timer that has an 
output timestamp holds this watermark. Note that timers can set new 
timers, indefinitely, so this may never reach infinity even in a drain 
scenario.


(C) (derived) total input watermark: this is a watermark that is the 
minimum of the two above, and ensures that all state for the DoFn for 
expired windows can be GCd after calling @OnWindowExpiration.


(D) output watermark: this is a promise that the DoFn will not output 
earlier than the watermark. It is held by the total input watermark.


So a any timer, processing or not, holds the total input watermark 
which prevents window GC, hence the timer must be fired. You can set 
timers without a timestamp and they will not hold (B) hence not hold 
the total input / GC watermark (C). Then if a timer fires for an 
expired window, it is ignored. But in general a timer that sets an 
output timestamp is saying that it may produce output, so it *must* be 
fired, even in batch, for data integrity. There was a time before 
timers had output timestamps that we said that you *always* have to 
have an @OnWindowExpiration callback for data integrity, and 
processing time timers could not hold the watermark. That is changed now.


One main purpose of processing time timers in streaming is to be a 
"timeout" for data buffered in state, to eventually flush. In this 
case the output timestamp should be the minimum of the elements in 
state (or equivalent). In batch, of course, this kind of timer is not 
relevant and we should definitely not wait for it, because the goal is 
to just get through all the data. We can justify this by saying that 
the worker really has no business having any idea what time it really 
is, and the runner can just run the clock at whatever speed it wants.


Another purpose, brought up on the Throttle thread, is to wait or 
backoff. In this case it would be desired for the timer to actually 
cause batch processing to pause 

Re: Pipeline upgrade to 2.55.0-SNAPSHOT broken for FlinkRunner

2024-02-21 Thread Jan Lukavský
Reasons we use Java serialization are not fundamental, probably only 
historical. Thinking about it, yes, there is lucky coincidence that we 
currently have to change the serialization because of Flink 1.17 
support. Flink 1.17 actually removes the legacy java serialization from 
Flink and enforces custom serialization. Therefore, we need to introduce 
an upgrade compatible change of serialization to support Flink 1.17. 
This is already implemented in [1]. The PR can go further, though. We 
can replace Java serialization of Coder in the TypeSerializerSnapshot 
and use the portable representation of Coder (which will still use Java 
serialization in some cases, but might avoid it at least for well-known 
coders, moreover Coders should be more upgrade-stable classes).


I'll try to restore the SerializablePipelineOptions (copy) in 
FlinkRunner only and rework the serialization in a more stable way (at 
least avoid serializing the CoderTypeSerializer, which references the 
SerializablePipelineOptions).


I created [2] and marked it as blocker for 2.55.0 release, because 
otherwise we would break the upgrade.


Thanks for the discussion, it helped a lot.

 Jan

[1] https://github.com/apache/beam/pull/30197

[2] https://github.com/apache/beam/issues/30385

On 2/21/24 20:33, Kenneth Knowles wrote:
Yea I think we should restore the necessary classes but also fix the 
FlinkRunner. Java serialization is inherently self-update-incompatible.


On Wed, Feb 21, 2024 at 1:35 PM Reuven Lax via dev 
 wrote:


Is there a fundamental reason we serialize java classes into Flink
savepoints.

On Wed, Feb 21, 2024 at 9:51 AM Robert Bradshaw via dev
 wrote:

We could consider merging the gradle targets without renaming the
classpaths as an intermediate step.

Optimistically, perhaps there's a small number of classes that
we need
to preserve (e.g. SerializablePipelineOptions looks like it was
something specifically intended to be serialized; maybe that an a
handful of others (that implement Serializable) could be left
in their
original packages for backwards compatibility reasons?

On Wed, Feb 21, 2024 at 7:32 AM Jan Lukavský 
wrote:
>
> Hi,
>
> while implementing FlinkRunner for Flink 1.17 I tried to
verify that a
> running Pipeline is able to successfully upgrade from Flink
1.16 to
> Flink 1.17. There is some change regarding serialization
needed for
> Flink 1.17, so this was a concern. Unfortunately recently we
merged
> core-construction-java into SDK, which resulted in some
classes being
> repackaged. Unfortunately, we serialize some classes into
Flink's
> check/savepoints. The renaming of the class therefore ends
with the
> following exception trying to restore from the savepoint:
>
> Caused by: java.lang.ClassNotFoundException:
>
org.apache.beam.runners.core.construction.SerializablePipelineOptions
>      at java.base/java.net
<http://java.net>.URLClassLoader.findClass(URLClassLoader.java:476)
>      at
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>      at
>

org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
>      at
>

org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>      at
>

org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
>      at
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>      at
>

org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192)
>      at java.base/java.lang.Class.forName0(Native Method)
>      at java.base/java.lang.Class.forName(Class.java:398)
>      at
>

org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>      at
>

org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:251)
>
>
> This means that no Pipeline will be able to successfully
upgrade from
> version prior to 2.55.0 to 2.55.0 (i.e. all Pipelines will
have to be
> restarted from scratch). I wanted to know how the community
would feel
> about that, this consequence probably was not clear when we
merged the
> artifacts. The only option would be to revert the merge and

Re: Throttle PTransform

2024-02-21 Thread Jan Lukavský


On 2/21/24 18:27, Reuven Lax via dev wrote:
Agreed, that event-time throttling doesn't make sense here. In theory 
processing-time timers have no SLA - i.e. their firing might be 
delayed - so batch runners aren't violating the model by firing them 
all at the end; however it does make processing time timers less 
useful in batch, as we see here.


Personally, I'm not sure I would use state and timers to implement 
this, and I definitely wouldn't create this many keys. A couple of 
reasons for this:
  1. If a pipeline is receiving input faster than the throttle rate, 
the proposed technique would shift all those elements into the DoFn's 
state which will keep growing indefinitely. Generally we would prefer 
to leave that backlog in the source instead of copying it into DoFn state.
  2. In my experience with throttling, having too much parallelism is 
problematic. The issue is that there is some error involved whenever 
you throttle, and this error can accumulate across many shards (and 
when I've done this sort of thing before, I found that the error was 
often biased in one direction). If targeting 100,000 records/sec, 
this  approach (if I understand it correctly) would create 100,000 
shards and throttle them each to one element/sec. I doubt this will 
actually result in anything close to desired throttling.
  3. Very commonly, the request is to throttle based on bytes/sec, not 
events/sec. Anything we build should be easily extensible to bytes/sec.


What I would suggest (and what Beam users have often done in the past) 
would be to bucket the PCollection into N buckets where N is generally 
smallish (100 buckets, 1000 buckets, depending on the expected 
throughput); runners that support autosharding (such as Dataflow) can 
automatically choose N. Each shard then throttles its output to 
rate/N. Keeping N no larger than necessary minimizes the error 
introduced into throttling.


We also don't necessarily need state/timers here - each shard is 
processed on a single thread, so those threads can simply throttle 
calls to OutputReceiver.output. This way if the pipeline is exceeding 
the threshold, backpressure will tend to simply leave excess data in 
the source. This also is a simpler design than the proposed one.


A more sophisticated design might combine elements of both - buffering 
a bounded amount of data in state when the threshold is exceeded, but 
severely limiting the state size. However I wouldn't start here - we 
would want to build the simpler implementation first and see how it 
performs.

+1


On Wed, Feb 21, 2024 at 8:53 AM Robert Bradshaw via dev 
 wrote:


On Wed, Feb 21, 2024 at 12:48 AM Jan Lukavský  wrote:
>
> Hi,
>
> I have left a note regarding the proposed splitting of batch and
> streaming expansion of this transform. In general, a need for
such split
> triggers doubts in me. This signals that either
>
>   a) the transform does something is should not, or
>
>   b) Beam model is not complete in terms of being "unified"
>
> The problem that is described in the document is that in the
batch case
> timers are not fired appropriately.

+1. The underlying flaw is that processing time timers are not handled
correctly in batch, but should be (even if it means keeping workers
idle?). We should fix this.

> This is actually on of the
> motivations that led to introduction of @RequiresTimeSortedInput
> annotation and, though mentioned years ago as a question, I do not
> remember what arguments were used against enforcing sorting
inputs by
> timestamp in the batch stateful DoFn as a requirement in the
model. That
> would enable the appropriate firing of timers while preserving
the batch
> invariant which is there are no late data allowed. IIRC there are
> runners that do this sorting by default (at least the sorting,
not sure
> about the timers, but once inputs are sorted, firing timers is
simple).
>
> A different question is if this particular transform should
maybe fire
> not by event time, but rather processing time?

Yeah, I was reading all of these as processing time. Throttling by
event time doesn't make much sense.

> On 2/21/24 03:00, Robert Burke wrote:
> > Thanks for the design Damon! And thanks for collaborating with
me on getting a high level textual description of the key
implementation idea down in writing. I think the solution is
pretty elegant.
> >
> > I do have concerns about how different Runners might handle
ProcessContinuations for the Bounded Input case. I know Dataflow
famously has two different execution modes under the hood, but I
agree with the principle that ProcessContinuation.Resume should
largely be in line with the expected delay, though it's by no
means

Re: Throttle PTransform

2024-02-21 Thread Jan Lukavský



On 2/21/24 17:52, Robert Bradshaw via dev wrote:

On Wed, Feb 21, 2024 at 12:48 AM Jan Lukavský  wrote:

Hi,

I have left a note regarding the proposed splitting of batch and
streaming expansion of this transform. In general, a need for such split
triggers doubts in me. This signals that either

   a) the transform does something is should not, or

   b) Beam model is not complete in terms of being "unified"

The problem that is described in the document is that in the batch case
timers are not fired appropriately.

+1. The underlying flaw is that processing time timers are not handled
correctly in batch, but should be (even if it means keeping workers
idle?). We should fix this.


This is actually on of the
motivations that led to introduction of @RequiresTimeSortedInput
annotation and, though mentioned years ago as a question, I do not
remember what arguments were used against enforcing sorting inputs by
timestamp in the batch stateful DoFn as a requirement in the model. That
would enable the appropriate firing of timers while preserving the batch
invariant which is there are no late data allowed. IIRC there are
runners that do this sorting by default (at least the sorting, not sure
about the timers, but once inputs are sorted, firing timers is simple).

A different question is if this particular transform should maybe fire
not by event time, but rather processing time?

Yeah, I was reading all of these as processing time. Throttling by
event time doesn't make much sense.
Yeah, I was not aware that we do not trigger processing time timers in 
batch case, which is why I was under the wrong impression that the 
document talks about event time timers, which really makes little sense. 
I think generally it should be possible to correctly fire both event and 
processing time timers in the batch case.



On 2/21/24 03:00, Robert Burke wrote:

Thanks for the design Damon! And thanks for collaborating with me on getting a 
high level textual description of the key implementation idea down in writing. 
I think the solution is pretty elegant.

I do have concerns about how different Runners might handle 
ProcessContinuations for the Bounded Input case. I know Dataflow famously has 
two different execution modes under the hood, but I agree with the principle 
that ProcessContinuation.Resume should largely be in line with the expected 
delay, though it's by no means guaranteed AFAIK.

We should also ensure this is linked from https://s.apache.org/beam-design-docs 
if not already.

Robert Burke
Beam Go Busybody

On 2024/02/20 14:00:00 Damon Douglas wrote:

Hello Everyone,

The following describes a Throttle PTransform that holds element throughput
to minimize downstream API overusage. Thank you for reading and your
valuable input.

https://s.apache.org/beam-throttle-transform

Best,

Damon



Pipeline upgrade to 2.55.0-SNAPSHOT broken for FlinkRunner

2024-02-21 Thread Jan Lukavský

Hi,

while implementing FlinkRunner for Flink 1.17 I tried to verify that a 
running Pipeline is able to successfully upgrade from Flink 1.16 to 
Flink 1.17. There is some change regarding serialization needed for 
Flink 1.17, so this was a concern. Unfortunately recently we merged 
core-construction-java into SDK, which resulted in some classes being 
repackaged. Unfortunately, we serialize some classes into Flink's 
check/savepoints. The renaming of the class therefore ends with the 
following exception trying to restore from the savepoint:


Caused by: java.lang.ClassNotFoundException: 
org.apache.beam.runners.core.construction.SerializablePipelineOptions

    at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
    at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
    at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
    at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)

    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
    at 
org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192)

    at java.base/java.lang.Class.forName0(Native Method)
    at java.base/java.lang.Class.forName(Class.java:398)
    at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
    at 
org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:251)



This means that no Pipeline will be able to successfully upgrade from 
version prior to 2.55.0 to 2.55.0 (i.e. all Pipelines will have to be 
restarted from scratch). I wanted to know how the community would feel 
about that, this consequence probably was not clear when we merged the 
artifacts. The only option would be to revert the merge and then try to 
figure out how to avoid Java serialization in Flink's savepoints. That 
would definitely be costly in terms of implementation and even more to 
provide ways to transfer old savepoints to the new format (can be 
possible using state processor API). I'm aware that Beam provides no 
general guarantees about the upgrade compatibility, so it might be fine 
to just ignore this, I just wanted to shout this out loud so that we can 
make a deliberate decision.


Best,

 Jan



Re: Throttle PTransform

2024-02-21 Thread Jan Lukavský

Hi,

I have left a note regarding the proposed splitting of batch and 
streaming expansion of this transform. In general, a need for such split 
triggers doubts in me. This signals that either


 a) the transform does something is should not, or

 b) Beam model is not complete in terms of being "unified"

The problem that is described in the document is that in the batch case 
timers are not fired appropriately. This is actually on of the 
motivations that led to introduction of @RequiresTimeSortedInput 
annotation and, though mentioned years ago as a question, I do not 
remember what arguments were used against enforcing sorting inputs by 
timestamp in the batch stateful DoFn as a requirement in the model. That 
would enable the appropriate firing of timers while preserving the batch 
invariant which is there are no late data allowed. IIRC there are 
runners that do this sorting by default (at least the sorting, not sure 
about the timers, but once inputs are sorted, firing timers is simple).


A different question is if this particular transform should maybe fire 
not by event time, but rather processing time?


Best,
 Jan

On 2/21/24 03:00, Robert Burke wrote:

Thanks for the design Damon! And thanks for collaborating with me on getting a 
high level textual description of the key implementation idea down in writing. 
I think the solution is pretty elegant.

I do have concerns about how different Runners might handle 
ProcessContinuations for the Bounded Input case. I know Dataflow famously has 
two different execution modes under the hood, but I agree with the principle 
that ProcessContinuation.Resume should largely be in line with the expected 
delay, though it's by no means guaranteed AFAIK.

We should also ensure this is linked from https://s.apache.org/beam-design-docs 
if not already.

Robert Burke
Beam Go Busybody

On 2024/02/20 14:00:00 Damon Douglas wrote:

Hello Everyone,

The following describes a Throttle PTransform that holds element throughput
to minimize downstream API overusage. Thank you for reading and your
valuable input.

https://s.apache.org/beam-throttle-transform

Best,

Damon



Re: [ANNOUNCE] New Committer: Svetak Sundhar

2024-02-15 Thread Jan Lukavský

Congrats Svetak!

On 2/14/24 16:11, Yi Hu via dev wrote:

Congrats, Svetak!

On Wed, Feb 14, 2024 at 9:50 AM John Casey via dev 
 wrote:


Congrats Svetak!

On Wed, Feb 14, 2024 at 9:00 AM Ahmed Abualsaud
 wrote:

Congrats Svetak!

On 2024/02/14 02:05:02 Priyans Desai via dev wrote:
> Congratulations Svetak!!
>
> On Tue, Feb 13, 2024 at 8:09 PM Chamikara Jayalath via dev <
> dev@beam.apache.org> wrote:
>
> > Congrats Svetak!
> >
> > On Tue, Feb 13, 2024 at 4:39 PM Svetak Sundhar via dev <
> > dev@beam.apache.org> wrote:
> >
> >> Thanks everyone!! Looking forward to the continued
collaboration :)
> >>
> >>
> >> Svetak Sundhar
> >>
> >>   Data Engineer
> >> s vetaksund...@google.com
> >>
> >>
> >>
> >> On Mon, Feb 12, 2024 at 9:58 PM Byron Ellis via dev

> >> wrote:
> >>
> >>> Congrats Svetak!
> >>>
> >>> On Mon, Feb 12, 2024 at 6:57 PM Shunping Huang via dev <
> >>> dev@beam.apache.org> wrote:
> >>>
>  Congratulations, Svetak!
> 
>  On Mon, Feb 12, 2024 at 9:50 PM XQ Hu via dev

>  wrote:
> 
> > Great job, Svetak! Thanks for all your contributions
to Beam!!!
> >
> > On Mon, Feb 12, 2024 at 4:44 PM Valentyn Tymofieiev
via dev <
> > dev@beam.apache.org> wrote:
> >
> >> Congrats, Svetak!
> >>
> >> On Mon, Feb 12, 2024 at 11:20 AM Kenneth Knowles

> >> wrote:
> >>
> >>> Hi all,
> >>>
> >>> Please join me and the rest of the Beam PMC in
welcoming a new
> >>> committer: Svetak Sundhar (sve...@apache.org).
> >>>
> >>> Svetak has been with Beam since 2021. Svetak has
contributed code to
> >>> many areas of Beam, including notebooks, Beam Quest,
dataframes, and IOs.
> >>> We also want to especially highlight the effort
Svetak has put into
> >>> improving Beam's documentation, participating in
release validation, and
> >>> evangelizing Beam.
> >>>
> >>> Considering his contributions to the project over
this timeframe,
> >>> the Beam PMC trusts Svetak with the responsibilities
of a Beam committer.
> >>> [1]
> >>>
> >>> Thank you Svetak! And we are looking to see more of your
> >>> contributions!
> >>>
> >>> Kenn, on behalf of the Apache Beam PMC
> >>>
> >>> [1]
> >>>
> >>>

https://beam.apache.org/contribute/become-a-committer/#an-apache-beam-committer
> >>>
> >>
>


Re: [VOTE] Release 2.54.0, release candidate #2

2024-02-07 Thread Jan Lukavský

+1 (binding)

Validated Java SDK with Flink runner.

 Jan

On 2/7/24 06:23, Robert Burke via dev wrote:

Hi everyone,
Please review and vote on the release candidate #2 for the version 2.54.0,
as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)


Reviewers are encouraged to test their own use cases with the release
candidate, and vote +1 if
no issues are found. Only PMC member votes will count towards the final
vote, but votes from all
community members is encouraged and helpful for finding regressions; you
can either test your own
use cases [13] or use cases from the validation sheet [10].

The complete staging area is available for your review, which includes:
* GitHub Release notes [1],
* the official Apache source release to be deployed to dist.apache.org 
 [2],

which is signed with the key with fingerprint D20316F712213422 [3],
* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag "v2.54.0-RC2" [5],
* website pull request listing the release [6], the blog post [6], and
publishing the API reference manual [7].
* Python artifacts are deployed along with the source release to the
dist.apache.org  [2] and PyPI[8].
* Go artifacts and documentation are available at pkg.go.dev 
 [9]

* Validation sheet with a tab for 2.54.0 release to help with validation
[10].
* Docker images published to Docker Hub [11].
* PR to run tests against release branch [12].

The vote will be open for at least 72 hours. It is adopted by majority
approval, with at least 3 PMC affirmative votes.

For guidelines on how to try the release in your projects, check out 
our RC

testing guide [13].

Thanks,
Robert Burke
Beam 2.54.0 Release Manager

[1] https://github.com/apache/beam/milestone/18?closed=1
[2] https://dist.apache.org/repos/dist/dev/beam/2.54.0/
[3] https://dist.apache.org/repos/dist/release/beam/KEYS
[4] https://repository.apache.org/content/repositories/orgapachebeam-1368/
[5] https://github.com/apache/beam/tree/v2.54.0-RC2
[6] https://github.com/apache/beam/pull/30201
[7] https://github.com/apache/beam-site/pull/659
[8] https://pypi.org/project/apache-beam/2.54.0rc2/
[9]
https://pkg.go.dev/github.com/apache/beam/sdks/v2@v2.54.0-RC2/go/pkg/beam
[10]
https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=28763708
[11] https://hub.docker.com/search?q=apache%2Fbeam=image 


[12] https://github.com/apache/beam/pull/30104
[13]
https://github.com/apache/beam/blob/master/contributor-docs/rc-testing-guide.md

Re: [DESIGN PROPOSAL] Reshuffle Allowing Duplicates

2024-01-31 Thread Jan Lukavský

Hi,

if I understand this proposal correctly, the motivation is actually 
reducing latency by bypassing bundle atomic guarantees, bundles after 
"at least once" Reshuffle would be reconstructed independently of the 
pre-shuffle bundling. Provided this is correct, it seems that the 
behavior is slightly more general than for the case of Reshuffle. We 
have already some transforms that manipulate a specific property of a 
PCollection - if it may or might not contain duplicates. That is 
manipulated in two ways - explicitly removing duplicates based on IDs on 
sources that generate duplicates and using @RequiresStableInput, mostly 
in sinks. These techniques modify an inherent property of a PCollection, 
that is if it contains or does not contain possible duplicates 
originating from the same input element.


There are two types of duplicates - duplicate elements in _different 
bundles_ (typically from at-least-once sources) and duplicates arising 
due to bundle reprocessing (affecting only transforms with side-effects, 
that is what we solve by @RequiresStableInput). The point I'm trying to 
get to - should we add these properties to PCollections (contains 
cross-bundle duplicates vs. does not) and PTransforms ("outputs 
deduplicated elements" and "requires stable input")? That would allow us 
to analyze the Pipeline DAG and provide appropriate implementation for 
Reshuffle automatically, so that a new URN or flag would not be needed. 
Moreover, this might be useful for a broader range of optimizations.


WDYT?

 Jan

On 1/30/24 23:22, Robert Burke wrote:
Is the benefit of this proposal just the bounded deviation from the 
existing reshuffle?


Reshuffle is already rather dictated by arbitrary runner choice, from 
simply ignoring the node, to forcing a materialization break, to a 
full shuffle implementation which has additional side effects.


But model wise I don't believe it guarantees specific checkpointing or 
re-execution behavior as currently specified. The proto only says it 
represents the operation (without specifying the behavior, that is a 
big problem).


I guess my concern here is that it implies/codifies that the existing 
reshuffle has more behavior than it promises outside of the Java SDK.


"Allowing duplicates" WRT reshuffle is tricky. It feels like mostly 
allows an implementation that may mean the inputs into the reshuffle 
might be re-executed for example. But that's always under the runner's 
discretion , and ultimately it could also prevent even getting the 
intended benefit of a reshuffle (notionally, just a fusion break).


Is there even a valid way to implement the notion of a reshuffle that 
leads to duplicates outside of a retry/resilience case?


---

To be clear, I'm not against the proposal. I'm against that its being 
built on a non-existent foundation. If the behavior isn't already 
defined, it's impossible to specify a real deviation from it.


I'm all for more specific behaviors if means we actually clarify what 
the original version is in the protos, since its news to me ( just 
now, because I looked) that the Java reshuffle promises GBK-like side 
effects. But that's a long deprecated transform without a satisfying 
replacement for it's usage, so it may be moot.


Robert Burke



On Tue, Jan 30, 2024, 1:34 PM Kenneth Knowles  wrote:

Hi all,

Just when you thought I had squeezed all the possible interest out
of this most boring-seeming of transforms :-)

I wrote up a very quick proposal as a doc [1]. It is short enough
that I will also put the main idea and main question in this email
so you can quickly read. Best to put comments in the.

Main idea: add a variation of Reshuffle that allows duplicates,
aka "at least once", so that users and runners can benefit from
efficiency if it is possible

Main question: is it best as a parameter to existing reshuffle
transforms or as new URN(s)? I have proposed it as a parameter but
I think either one could work.

I would love feedback on the main idea, main question, or anywhere
on the doc.

Thanks!

Kenn

[1] https://s.apache.org/beam-reshuffle-allowing-duplicates


Re: @RequiresTimeSortedInput adoption by runners

2024-01-20 Thread Jan Lukavský
've suggested 
the Java SDK could do, but it avoids the runner needing to be aware of a 
specific implementation requirement around a feature it doesn't support.  If it 
has to do something specific to support an SDK specific mechanism, that's still 
supporting the feature, but I fear it's not a great road to tread on for 
runners to add SDK specific implementation details.

If a (portable) runner is going to spend work on doing something to handle 
RequiresTimeSortedInput, it's probably easier to handle it generally than to 
try to enable a Java specific work around. I'm not even sure how that could 
work since the SDK would then need a special interpretation of what a runner 
sent back for it to do any SDK side special backup handling, vs the simple 
execution of the given transform.

It's entirely possible I've over simplified the "fallback" protocol described 
above, so this thread is still useful for my Prism work, especially if I see any similar 
situations once I start on the Java Validates Runner suite.

Robert Burke
Beam Go Busybody

On Fri, Jan 19, 2024, 6:41 AM Jan Lukavský  wrote:

I was primarily focused on Java SDK (and core-contruction-java), but generally 
speaking, any SDK can provide default expansion that runners can use so that it 
is not (should not be) required to implement this manually.
Currently, in Java SDK, the annotation is wired up into StatefulDoFnRunner, 
which (as name suggests) can be used for running stateful DoFns. The problem is 
that not every runner is using this facility. Java SDK generally supports 
providing default expansions of transforms, but _only for transforms that do 
not have to work with dynamic state_. This is not the case for this annotation 
- a default implementation for @RequiresTimeSortedInput has to take another 
DoFn as input, and wire its lifecycle in a way that elements are buffered in 
(dynamically created) buffer and fed into the downstream DoFn only when timer 
fires.

If I narrow down my line of thinking, it would be possible to:
  a) create something like "dynamic pipeline expansion", which would make it 
possible work with PTransforms in this way (probably would require some ByteBuddy magic)
  b) wire this up to DoFnInvoker, which takes DoFn and creates class that is 
used by runners for feeding data

Option b) would ensure that actually all runners support such expansion, but 
seems to be somewhat hacky and too specific to this case. Moreover, it would 
require knowledge if the expansion is actually required by the runner (e.g. if 
the annotation is supported explicitly - most likely for batch execution). 
Therefore I'd be in favor of option a), this might be reusable by a broader 
range of default expansions.

In other SDKs than Java this might have different implications, the reason why 
it is somewhat more complicated to do dynamic (or generic?) expansions of 
PTransforms in Java is mostly due to how DoFns are implemented in terms of 
annotations and the DoFnInvokers involved for efficiency.

  Jan

On 1/18/24 18:35, Robert Burke wrote:

I agree that variable support across Runners does limit the adoption of a 
feature.  But it's also then limited if the SDKs and their local / direct 
runners don't yet support the feature. The Go SDK doesn't currently have a way 
of specifying that annotation, preventing use.  (The lack of mention of the 
Python direct runner your list implies it's not yet supported by the Python 
SDK, and a quick search shows that's likely [0])

While not yet widely available to the other SDKs, Prism, the new Go SDK Local 
Runner, maintains data in event time sorted heaps [1]. The intent was to 
implement the annotation (among other features) once I start running the Java 
and Python Validates Runner suites against it.

I think stateful transforms are getting the event ordering on values for "free" 
as a result [2], but there's no special/behavior at present if the DoFn is consuming the 
result of a Group By Key.

Part of the issue is that by definition, a GBK "loses" the timestamps of the 
values, and doesn't emit them, outside of using them to determine the resulting timestamp 
of the Key... [3]. To make use of the timestamp in the aggregation stage a runner would 
need to do something different in the GBK, namely sorting by the timestamp as the data is 
ingested, and keeping that timestamp around to continue the sort. This prevents a more 
efficient implementation of directly arranging the received element bytes into the 
Iterator format, requiring a post process filtering. Not hard, but a little dissatisfying.

Skimming through the discussion, I agree with the general utility goal of the 
annotation, but as with many Beam features, there may be a discoverability 
problem. The feature isn't mentioned in the Programming Guide (AFAICT), and 
trying to find anything on the beam site, the top result is the Javadoc for the 
annotation (which is good, but you still need to know to look fo

Re: @RequiresTimeSortedInput adoption by runners

2024-01-19 Thread Jan Lukavský
epo%3Aapache%2Fbeam+TIME_SORTED_INPUT+language%3APython=code

[1]https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go#L93

[2]https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go#L1094

[3]https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L1132

[4]https://github.com/apache/beam/issues?q=is%3Aissue+is%3Aopen+OrderedListState

[5]https://github.com/search?q=repo%3Aapache%2Fbeam+RequiresTimeSortedInput=code=2

[6]https://github.com/apache/beam/blob/b4c23b32f2b80ce052c8a235e5064c69f37df992/website/www/site/content/en/blog/beam-2.20.0.md?plain=1#L46

On 2024/01/18 16:14:56 Jan Lukavský wrote:

Hi,

recently I came across the fact that most runners do not support
@RequiresTimeSortedInput annotation for sorting per-key data by event
timestamp [1]. Actually, runners supporting it seem to be Direct java,
Flink and Dataflow batch (as it is a noop there). The annotation has
use-cases in time-series data processing, in transaction processing and
more. Though it is absolutely possible to implement the time-sorting
manually (e.g. [2]), this is actually efficient only in streaming mode,
in batch mode the runner typically wants to leverage the internal
sort-grouping it already does.

The original idea was to implement this annotation inside
StatefulDoFnRunner, which would be used by majority of runners. It turns
out that this is not the case. The question now is, should we use an
alternative place to implement the annotation (e.g. Pipeline expansion,
or DoFnInvoker) so that more runners can benefit from it automatically
(at least for streaming case, batch case needs to be implemented
manually)? Do the community find the annotation useful? I'm linking a
rather old (and long :)) thread that preceded introduction of the
annotation [3] for more context.

I sense the current adoption of the annotation by runners makes it
somewhat use-less.

Looking forward to any comments on this.

Best,

   Jan

[1]
https://beam.apache.org/releases/javadoc/2.53.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html

[2]
https://cloud.google.com/spanner/docs/change-streams/use-dataflow#order-by-key

[3]https://lists.apache.org/thread/bkl9kk8l44xw2sw08s7m54k1wsc3n4tn



@RequiresTimeSortedInput adoption by runners

2024-01-18 Thread Jan Lukavský

Hi,

recently I came across the fact that most runners do not support 
@RequiresTimeSortedInput annotation for sorting per-key data by event 
timestamp [1]. Actually, runners supporting it seem to be Direct java, 
Flink and Dataflow batch (as it is a noop there). The annotation has 
use-cases in time-series data processing, in transaction processing and 
more. Though it is absolutely possible to implement the time-sorting 
manually (e.g. [2]), this is actually efficient only in streaming mode, 
in batch mode the runner typically wants to leverage the internal 
sort-grouping it already does.


The original idea was to implement this annotation inside 
StatefulDoFnRunner, which would be used by majority of runners. It turns 
out that this is not the case. The question now is, should we use an 
alternative place to implement the annotation (e.g. Pipeline expansion, 
or DoFnInvoker) so that more runners can benefit from it automatically 
(at least for streaming case, batch case needs to be implemented 
manually)? Do the community find the annotation useful? I'm linking a 
rather old (and long :)) thread that preceded introduction of the 
annotation [3] for more context.


I sense the current adoption of the annotation by runners makes it 
somewhat use-less.


Looking forward to any comments on this.

Best,

 Jan

[1] 
https://beam.apache.org/releases/javadoc/2.53.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html


[2] 
https://cloud.google.com/spanner/docs/change-streams/use-dataflow#order-by-key


[3] https://lists.apache.org/thread/bkl9kk8l44xw2sw08s7m54k1wsc3n4tn



Re: [VOTE] Release 2.53.0, release candidate #2

2023-12-28 Thread Jan Lukavský

+1 (binding)

Tested Java SDK with Flink Runner.

 Jan

On 12/27/23 14:13, Danny McCormick via dev wrote:

+1 (non-binding)

Tested with some example ML notebooks.

Thanks,
Danny

On Tue, Dec 26, 2023 at 6:41 PM XQ Hu via dev  wrote:

+1 (non-binding)

Tested with the simple RunInference pipeline:

https://github.com/google/dataflow-ml-starter/actions/runs/7332832875/job/19967521369

On Tue, Dec 26, 2023 at 3:29 PM Jack McCluskey via dev
 wrote:

Happy holidays everyone,

Please review and vote on the release candidate #2 for the
version 2.53.0, as follows:

[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific
comments)

Reviewers are encouraged to test their own use cases with the
release candidate, and vote +1 if no issues are found. Only
PMC member votes will count towards the final vote, but votes
from all community members are encouraged and helpful for
finding regressions; you can either test your own use cases
[13] or use cases from the validation sheet [10].

The complete staging area is available for your review, which
includes:
* GitHub Release notes [1],
* the official Apache source release to be deployed to
dist.apache.org  [2], which is signed
with the key with fingerprint DF3CBA4F3F4199F4
(D20316F712213422 if automated) [3],
* all artifacts to be deployed to the Maven Central Repository
[4],
* source code tag "v1.2.3-RC3" [5],
* website pull request listing the release [6], the blog post
[6], and publishing the API reference manual [7].
* Python artifacts are deployed along with the source release
to the dist.apache.org  [2] and PyPI[8].
* Go artifacts and documentation are available at pkg.go.dev
 [9]
* Validation sheet with a tab for 2.53.0 release to help with
validation [10].
* Docker images published to Docker Hub [11].
* PR to run tests against release branch [12].

The vote will be open for at least 72 hours. It is adopted by
majority approval, with at least 3 PMC affirmative votes.

For guidelines on how to try the release in your projects,
check out our RC testing guide [13].

Thanks,

Jack McCluskey

[1] https://github.com/apache/beam/milestone/17
[2] https://dist.apache.org/repos/dist/dev/beam/2.53.0/
[3] https://dist.apache.org/repos/dist/release/beam/KEYS
[4]
https://repository.apache.org/content/repositories/orgapachebeam-1365/
[5] https://github.com/apache/beam/tree/v2.53.0-RC2
[6] https://github.com/apache/beam/pull/29856
[7] https://github.com/apache/beam-site/pull/657
[8] https://pypi.org/project/apache-beam/2.53.0rc2/
[9]

https://pkg.go.dev/github.com/apache/beam/sdks/v2@v2.53.0-RC2/go/pkg/beam
[10]

https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=1290249774
[11] https://hub.docker.com/search?q=apache%2Fbeam=image

[12] https://github.com/apache/beam/pull/29758
[13]

https://github.com/apache/beam/blob/master/contributor-docs/rc-testing-guide.md


-- 




Jack McCluskey
SWE - DataPLS PLAT/ Dataflow ML
RDU
jrmcclus...@google.com



Re: [VOTE] Release 2.52.0, release candidate #5

2023-11-15 Thread Jan Lukavský

+1 (binding)

Validated Java SDK with Flink runner on own use cases.

 Jan

On 11/15/23 11:35, Jean-Baptiste Onofré wrote:

+1 (binding)

Quickly tested Java SDK and checked the legal part (hash, signatures, headers).

Regards
JB

On Tue, Nov 14, 2023 at 12:06 AM Danny McCormick via dev
 wrote:

Hi everyone,
Please review and vote on the release candidate #5 for the version 2.52.0, as 
follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)


Reviewers are encouraged to test their own use cases with the release 
candidate, and vote +1 if no issues are found. Only PMC member votes will count 
towards the final vote, but votes from all community members is encouraged and 
helpful for finding regressions; you can either test your own use cases or use 
cases from the validation sheet [10].

The complete staging area is available for your review, which includes:

GitHub Release notes [1]
the official Apache source release to be deployed to dist.apache.org [2], which 
is signed with the key with fingerprint D20316F712213422 [3]
all artifacts to be deployed to the Maven Central Repository [4]
source code tag "v2.52.0-RC5" [5]
website pull request listing the release [6], the blog post [6], and publishing 
the API reference manual [7]
Python artifacts are deployed along with the source release to the 
dist.apache.org [2] and PyPI[8].
Go artifacts and documentation are available at pkg.go.dev [9]
Validation sheet with a tab for 2.52.0 release to help with validation [10]
Docker images published to Docker Hub [11]
PR to run tests against release branch [12]


The vote will be open for at least 72 hours. It is adopted by majority 
approval, with at least 3 PMC affirmative votes.

For guidelines on how to try the release in your projects, check out our blog 
post at https://beam.apache.org/blog/validate-beam-release/.

Thanks,
Danny

[1] https://github.com/apache/beam/milestone/16
[2] https://dist.apache.org/repos/dist/dev/beam/2.52.0/
[3] https://dist.apache.org/repos/dist/release/beam/KEYS
[4] https://repository.apache.org/content/repositories/orgapachebeam-1363/
[5] https://github.com/apache/beam/tree/v2.52.0-RC5
[6] https://github.com/apache/beam/pull/29331
[7] https://github.com/apache/beam-site/pull/655
[8] https://pypi.org/project/apache-beam/2.52.0rc5/
[9] https://pkg.go.dev/github.com/apache/beam/sdks/v2@v2.52.0-RC5/go/pkg/beam
[10] 
https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=1387982510
[11] https://hub.docker.com/search?q=apache%2Fbeam=image
[12] https://github.com/apache/beam/pull/29418


Re: [VOTE] Release 2.52.0, release candidate #4

2023-11-13 Thread Jan Lukavský

+1 (binding)

Validated Java SDK with Flink runner on own use cases.

 Jan

On 11/12/23 00:44, Danny McCormick via dev wrote:

Hi everyone,
Please review and vote on the release candidate #3 for the version 
2.52.0, as follows:

[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)


Reviewers are encouraged to test their own use cases with the release 
candidate, and vote +1 if no issues are found. Only PMC member votes 
will count towards the final vote, but votes from all community 
members is encouraged and helpful for finding regressions; you can 
either test your own use cases or use cases from the validation sheet 
[10].


The complete staging area is available for your review, which includes:

  * GitHub Release notes [1]
  * the official Apache source release to be deployed to
dist.apache.org  [2], which is signed
with the key with fingerprint D20316F712213422 [3]
  * all artifacts to be deployed to the Maven Central Repository [4]
  * source code tag "v2.52.0-RC4" [5]
  * website pull request listing the release [6], the blog post [6],
and publishing the API reference manual [7]
  * Python artifacts are deployed along with the source release to the
dist.apache.org  [2] and PyPI[8].
  * Go artifacts and documentation are available at pkg.go.dev
 [9]
  * Validation sheet with a tab for 2.52.0 release to help with
validation [10]
  * Docker images published to Docker Hub [11]
  * PR to run tests against release branch [12]


The vote will be open for at least 72 hours. It is adopted by majority 
approval, with at least 3 PMC affirmative votes.


For guidelines on how to try the release in your projects, check out 
our blog post at https://beam.apache.org/blog/validate-beam-release/.


Thanks,
Danny

[1] https://github.com/apache/beam/milestone/16
[2] https://dist.apache.org/repos/dist/dev/beam/2.52.0/
[3] https://dist.apache.org/repos/dist/release/beam/KEYS
[4] https://repository.apache.org/content/repositories/orgapachebeam-1362/
[5] https://github.com/apache/beam/tree/v2.52.0-RC4
[6] https://github.com/apache/beam/pull/29331
[7] https://github.com/apache/beam-site/pull/654
[8] https://pypi.org/project/apache-beam/2.52.0rc4/
[9] 
https://pkg.go.dev/github.com/apache/beam/sdks/v2@v2.52.0-RC4/go/pkg/beam
[10] 
https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=1387982510
[11] https://hub.docker.com/search?q=apache%2Fbeam=image 


[12] https://github.com/apache/beam/pull/29404

Re: [VOTE] Release 2.52.0, release candidate #3

2023-11-09 Thread Jan Lukavský

+1 (binding)

Validated Java SDK with Flink runner on own use cases.

 Jan

On 11/9/23 03:31, Danny McCormick via dev wrote:

Hi everyone,
Please review and vote on the release candidate #3 for the version 
2.52.0, as follows:

[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)


Reviewers are encouraged to test their own use cases with the release 
candidate, and vote +1 if no issues are found. Only PMC member votes 
will count towards the final vote, but votes from all community 
members is encouraged and helpful for finding regressions; you can 
either test your own use cases or use cases from the validation sheet 
[10].


The complete staging area is available for your review, which includes:

  * GitHub Release notes [1]
  * the official Apache source release to be deployed to
dist.apache.org  [2], which is signed
with the key with fingerprint D20316F712213422 [3]
  * all artifacts to be deployed to the Maven Central Repository [4]
  * source code tag "v2.52.0-RC3" [5]
  * website pull request listing the release [6], the blog post [6],
and publishing the API reference manual [7]
  * Python artifacts are deployed along with the source release to the
dist.apache.org  [2] and PyPI[8].
  * Go artifacts and documentation are available at pkg.go.dev
 [9]
  * Validation sheet with a tab for 2.52.0 release to help with
validation [10]
  * Docker images published to Docker Hub [11]
  * PR to run tests against release branch [12]


The vote will be open for at least 72 hours. It is adopted by majority 
approval, with at least 3 PMC affirmative votes.


For guidelines on how to try the release in your projects, check out 
our blog post at https://beam.apache.org/blog/validate-beam-release/.


Thanks,
Danny

[1] https://github.com/apache/beam/milestone/16
[2] https://dist.apache.org/repos/dist/dev/beam/2.52.0/
[3] https://dist.apache.org/repos/dist/release/beam/KEYS
[4] https://repository.apache.org/content/repositories/orgapachebeam-1361/
[5] https://github.com/apache/beam/tree/v2.52.0-RC3
[6] https://github.com/apache/beam/pull/29331
[7] https://github.com/apache/beam-site/pull/653
[8] https://pypi.org/project/apache-beam/2.52.0rc2/
[9] 
https://pkg.go.dev/github.com/apache/beam/sdks/v2@v2.52.0-RC3/go/pkg/beam
[10] 
https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=1387982510
[11] https://hub.docker.com/search?q=apache%2Fbeam=image 


[12] https://github.com/apache/beam/pull/29319

Re: [VOTE] Release 2.52.0, release candidate #2

2023-11-08 Thread Jan Lukavský

+1 (binding)

Validated Java SDK with Flink runner on own use cases.

 Jan

On 11/8/23 00:24, Danny McCormick via dev wrote:

Hi everyone,
Please review and vote on the release candidate #2 for the version 
2.52.0, as follows:

[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)


Reviewers are encouraged to test their own use cases with the release 
candidate, and vote +1 if no issues are found. Only PMC member votes 
will count towards the final vote, but votes from all community 
members is encouraged and helpful for finding regressions; you can 
either test your own use cases or use cases from the validation sheet 
[10].


The complete staging area is available for your review, which includes:

  * GitHub Release notes [1]
  * the official Apache source release to be deployed to
dist.apache.org  [2], which is signed with
the key with fingerprint D20316F712213422 [3]
  * all artifacts to be deployed to the Maven Central Repository [4]
  * source code tag "v2.52.0-RC1" [5]
  * website pull request listing the release [6], the blog post [6],
and publishing the API reference manual [7]
  * Python artifacts are deployed along with the source release to the
dist.apache.org  [2] and PyPI[8].
  * Go artifacts and documentation are available at pkg.go.dev
 [9]
  * Validation sheet with a tab for 2.52.0 release to help with
validation [10]
  * Docker images published to Docker Hub [11]
  * PR to run tests against release branch [12]


The vote will be open for at least 72 hours. It is adopted by majority 
approval, with at least 3 PMC affirmative votes.


For guidelines on how to try the release in your projects, check out 
our blog post at https://beam.apache.org/blog/validate-beam-release/.


Thanks,
Danny

[1] https://github.com/apache/beam/milestone/16
[2] https://dist.apache.org/repos/dist/dev/beam/2.52.0/
[3] https://dist.apache.org/repos/dist/release/beam/KEYS
[4] https://repository.apache.org/content/repositories/orgapachebeam-1360/
[5] https://github.com/apache/beam/tree/v2.52.0-RC2
[6] https://github.com/apache/beam/pull/29331
[7] https://github.com/apache/beam-site/pull/652
[8] https://pypi.org/project/apache-beam/2.52.0rc2/
[9] 
https://pkg.go.dev/github.com/apache/beam/sdks/v2@v2.52.0-RC2/go/pkg/beam
[10] 
https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=1387982510
[11] https://hub.docker.com/search?q=apache%2Fbeam=image 


[12] https://github.com/apache/beam/pull/29319

[LAZY CONSENSUS] Deprecate Euphoria extension

2023-11-02 Thread Jan Lukavský

Hi,

according to discussion [1], because no objections were raised and the 
overall usage (artifact download stats) is negligible compared to other 
Beam artifacts, I'll proceed with deprecating the Euphoria extension, 
unless there are any objections within 72 hours (excluding weekend).


Best,

 Jan

[1] https://lists.apache.org/thread/n1h1z95pvrfybd6v46xzybzc1y4xd4j3



Re: Processing time watermarks in KinesisIO

2023-11-01 Thread Jan Lukavský
> That is a fair point, but I don't think we can guarantee that we have 
a timestamp embedded in the record. (Or is there some stable kafka 
metadata we could use here, I'm not that familiar with what kafka 
guarantees). We could require it to be opt-in given the caveats.


Kafka (and Kinesis) offer an ingestion timestamp, which can also be set 
by users (at least in the Kafka case, I don't know the details of 
Kinesis). This is what KinesisIO actually uses as event timestamp - the 
"approximate arrival timestamp".


The tricky bit is how the user specifies the watermark, unless they can 
guarantee the custom timestamps are monotonically ordered (at least 
within a partition).


If assigned by the broker, the log append time is monotonic for 
partition (unless brokers in a cluster have clocks seriously out of 
sync, then a leader re-election could cause time go backwards). I went 
through the code in KafkaIO and it also has a great amount of code 
deprecated and with "use withLogAppendTime instead" comments (the 
associated issue even has only three digits [1] :)). I would tend to 
remove the processing time timestamping and watermarking all together, 
but let users specify it manually, provided they know the consequences.


Jan [1] https://issues.apache.org/jira/browse/BEAM-591

On 10/31/23 21:36, Robert Bradshaw via dev wrote:

On Tue, Oct 31, 2023 at 10:28 AM Jan Lukavský  wrote:

On 10/31/23 17:44, Robert Bradshaw via dev wrote:

There are really two cases that make sense:

(1) We read the event timestamps from the kafka records themselves and
have some external knowledge that guarantees (or at least provides a
very good heuristic) about what the timestamps of unread messages
could be in the future to set the watermark. This could possibly
involve knowing that the timestamps in a partition are monotonically
increasing, or somehow have bounded skew.

+1

(2) We use processing time as both the watermark and for setting the
event timestamp on produced messages. From this point on we can safely
reason about the event time.

This is where I have some doubts. We can reason about event time, but is
is not stable upon Pipeline restarts (if there is any downstream
processing that depends on event time and is not shielded by
@RequiresStableInput, it might give different results on restarts).

That is a fair point, but I don't think we can guarantee that we have
a timestamp embedded in the record. (Or is there some stable kafka
metadata we could use here, I'm not that familiar with what kafka
guarantees). We could require it to be opt-in given the caveats.


Is
there any specific case why not use option 1)? Do we have to provide the
alternative 2), provided users can implement it themselves (we would
need to allow users to specify custom timestamp function, but that
should be done in all cases)?

The tricky bit is how the user specifies the watermark, unless they
can guarantee the custom timestamps are monotonically ordered (at
least within a partition).


The current state seems a bit broken if I understand correctly.

+1

On Tue, Oct 31, 2023 at 1:16 AM Jan Lukavský  wrote:

I think that instead of deprecating and creating new version, we could leverage 
the proposed update compatibility flag for this [1]. I still have some doubts 
if the processing-time watermarking (and event-time assignment) makes sense. Do 
we have a valid use-case for that? This is actually the removed 
SYNCHRONIZED_PROCESSING_TIME time domain, which is problematic - restarts of 
Pipelines causes timestamps to change and hence makes *every* DoFn potentially 
non-deterministic, which would be unexpected side-effect. This makes me wonder 
if we should remove this policy altogether (deprecate or use the update 
compatibility flag, so that the policy throws exception in new version).

The crucial point would be to find a use-case where it is actually helpful to 
use such policy.
Any ideas?

   Jan

[1]https://lists.apache.org/thread/29r3zv04n4ooq68zzvpw6zm1185n59m2

On 10/27/23 18:33, Alexey Romanenko wrote:

Ahh, ok, I see.

Yes, it looks like a bug. So, I'd propose to deprecate the old "processing 
time” watermark policy, which we can remove later, and create a new fixed one.

PS: It’s recommended to use "org.apache.beam.sdk.io.aws2.kinesis.KinesisIO” 
instead of deprecated “org.apache.beam.sdk.io.kinesis.KinesisIO” one.

—
Alexey

On 27 Oct 2023, at 17:42, Jan Lukavský  wrote:

No, I'm referring to this [1] policy which has unexpected (and hardly avoidable 
on the user-code side) data loss issues. The problem is that assigning 
timestamps to elements and watermarks is completely decoupled and unrelated, 
which I'd say is a bug.

   Jan

[1]https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kinesis/KinesisIO.Read.html#withProcessingTimeWatermarkPolicy--

On 10/27/23 16:51, Alexey Romanenko wrote:

Why not just to create a custom watermark policy for that? Or you mean to make 
it as a def

Re: Processing time watermarks in KinesisIO

2023-10-31 Thread Jan Lukavský

On 10/31/23 17:44, Robert Bradshaw via dev wrote:

There are really two cases that make sense:

(1) We read the event timestamps from the kafka records themselves and
have some external knowledge that guarantees (or at least provides a
very good heuristic) about what the timestamps of unread messages
could be in the future to set the watermark. This could possibly
involve knowing that the timestamps in a partition are monotonically
increasing, or somehow have bounded skew.

+1


(2) We use processing time as both the watermark and for setting the
event timestamp on produced messages. From this point on we can safely
reason about the event time.
This is where I have some doubts. We can reason about event time, but is 
is not stable upon Pipeline restarts (if there is any downstream 
processing that depends on event time and is not shielded by 
@RequiresStableInput, it might give different results on restarts). Is 
there any specific case why not use option 1)? Do we have to provide the 
alternative 2), provided users can implement it themselves (we would 
need to allow users to specify custom timestamp function, but that 
should be done in all cases)?


The current state seems a bit broken if I understand correctly.

+1


On Tue, Oct 31, 2023 at 1:16 AM Jan Lukavský  wrote:

I think that instead of deprecating and creating new version, we could leverage 
the proposed update compatibility flag for this [1]. I still have some doubts 
if the processing-time watermarking (and event-time assignment) makes sense. Do 
we have a valid use-case for that? This is actually the removed 
SYNCHRONIZED_PROCESSING_TIME time domain, which is problematic - restarts of 
Pipelines causes timestamps to change and hence makes *every* DoFn potentially 
non-deterministic, which would be unexpected side-effect. This makes me wonder 
if we should remove this policy altogether (deprecate or use the update 
compatibility flag, so that the policy throws exception in new version).

The crucial point would be to find a use-case where it is actually helpful to 
use such policy.
Any ideas?

  Jan

[1] https://lists.apache.org/thread/29r3zv04n4ooq68zzvpw6zm1185n59m2

On 10/27/23 18:33, Alexey Romanenko wrote:

Ahh, ok, I see.

Yes, it looks like a bug. So, I'd propose to deprecate the old "processing 
time” watermark policy, which we can remove later, and create a new fixed one.

PS: It’s recommended to use "org.apache.beam.sdk.io.aws2.kinesis.KinesisIO” 
instead of deprecated “org.apache.beam.sdk.io.kinesis.KinesisIO” one.

—
Alexey

On 27 Oct 2023, at 17:42, Jan Lukavský  wrote:

No, I'm referring to this [1] policy which has unexpected (and hardly avoidable 
on the user-code side) data loss issues. The problem is that assigning 
timestamps to elements and watermarks is completely decoupled and unrelated, 
which I'd say is a bug.

  Jan

[1] 
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kinesis/KinesisIO.Read.html#withProcessingTimeWatermarkPolicy--

On 10/27/23 16:51, Alexey Romanenko wrote:

Why not just to create a custom watermark policy for that? Or you mean to make 
it as a default policy?

—
Alexey

On 27 Oct 2023, at 10:25, Jan Lukavský  wrote:


Hi,

when discussing about [1] we found out, that the issue is actually caused by 
processing time watermarks in KinesisIO. Enabling this watermark outputs 
watermarks based on current processing time, _but event timestamps are derived 
from ingestion timestamp_. This can cause unbounded lateness when processing 
backlog. I think this setup is error-prone and will likely cause data loss due 
to dropped elements. This can be solved in two ways:

  a) deprecate processing time watermarks, or

  b) modify KinesisIO's watermark policy so that is assigns event timestamps as 
well (the processing-time watermark policy would have to derive event 
timestamps from processing-time).

I'd prefer option b) , but it might be a breaking change, moreover I'm not sure 
if I understand the purpose of processing-time watermark policy, it might be 
essentially ill defined from the beginning, thus it might really be better to 
remove it completely. There is also a related issue [2].

Any thoughts on this?

  Jan

[1] https://github.com/apache/beam/issues/25975

[2] https://github.com/apache/beam/issues/28760





Re: Processing time watermarks in KinesisIO

2023-10-31 Thread Jan Lukavský
I think that instead of deprecating and creating new version, we could 
leverage the proposed update compatibility flag for this [1]. I still 
have some doubts if the processing-time watermarking (and event-time 
assignment) makes sense. Do we have a valid use-case for that? This is 
actually the removed SYNCHRONIZED_PROCESSING_TIME time domain, which is 
problematic - restarts of Pipelines causes timestamps to change and 
hence makes *every* DoFn potentially non-deterministic, which would be 
unexpected side-effect. This makes me wonder if we should remove this 
policy altogether (deprecate or use the update compatibility flag, so 
that the policy throws exception in new version).


The crucial point would be to find a use-case where it is actually 
helpful to use such policy.

Any ideas?

 Jan

[1] https://lists.apache.org/thread/29r3zv04n4ooq68zzvpw6zm1185n59m2

On 10/27/23 18:33, Alexey Romanenko wrote:

Ahh, ok, I see.

Yes, it looks like a bug. So, I'd propose to deprecate the old 
"processing time” watermark policy, which we can remove later, and 
create a new fixed one.


PS: It’s recommended to use 
/"org.apache.beam.sdk.io.aws2.kinesis.KinesisIO”/ instead of 
deprecated /“org.apache.beam.sdk.io.kinesis.KinesisIO”/ one.


—
Alexey


On 27 Oct 2023, at 17:42, Jan Lukavský  wrote:

No, I'm referring to this [1] policy which has unexpected (and hardly 
avoidable on the user-code side) data loss issues. The problem is 
that assigning timestamps to elements and watermarks is completely 
decoupled and unrelated, which I'd say is a bug.


 Jan

[1] 
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kinesis/KinesisIO.Read.html#withProcessingTimeWatermarkPolicy--


On 10/27/23 16:51, Alexey Romanenko wrote:
Why not just to create a custom watermark policy for that? Or you 
mean to make it as a default policy?


—
Alexey


On 27 Oct 2023, at 10:25, Jan Lukavský  wrote:


Hi,

when discussing about [1] we found out, that the issue is actually 
caused by processing time watermarks in KinesisIO. Enabling this 
watermark outputs watermarks based on current processing time, _but 
event timestamps are derived from ingestion timestamp_. This can 
cause unbounded lateness when processing backlog. I think this 
setup is error-prone and will likely cause data loss due to dropped 
elements. This can be solved in two ways:


 a) deprecate processing time watermarks, or

 b) modify KinesisIO's watermark policy so that is assigns event 
timestamps as well (the processing-time watermark policy would have 
to derive event timestamps from processing-time).


I'd prefer option b) , but it might be a breaking change, moreover 
I'm not sure if I understand the purpose of processing-time 
watermark policy, it might be essentially ill defined from the 
beginning, thus it might really be better to remove it completely. 
There is also a related issue [2].


Any thoughts on this?

 Jan

[1] https://github.com/apache/beam/issues/25975

[2] https://github.com/apache/beam/issues/28760





Re: Processing time watermarks in KinesisIO

2023-10-27 Thread Jan Lukavský
No, I'm referring to this [1] policy which has unexpected (and hardly 
avoidable on the user-code side) data loss issues. The problem is that 
assigning timestamps to elements and watermarks is completely decoupled 
and unrelated, which I'd say is a bug.


 Jan

[1] 
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kinesis/KinesisIO.Read.html#withProcessingTimeWatermarkPolicy--


On 10/27/23 16:51, Alexey Romanenko wrote:
Why not just to create a custom watermark policy for that? Or you mean 
to make it as a default policy?


—
Alexey


On 27 Oct 2023, at 10:25, Jan Lukavský  wrote:


Hi,

when discussing about [1] we found out, that the issue is actually 
caused by processing time watermarks in KinesisIO. Enabling this 
watermark outputs watermarks based on current processing time, _but 
event timestamps are derived from ingestion timestamp_. This can 
cause unbounded lateness when processing backlog. I think this setup 
is error-prone and will likely cause data loss due to dropped 
elements. This can be solved in two ways:


 a) deprecate processing time watermarks, or

 b) modify KinesisIO's watermark policy so that is assigns event 
timestamps as well (the processing-time watermark policy would have 
to derive event timestamps from processing-time).


I'd prefer option b) , but it might be a breaking change, moreover 
I'm not sure if I understand the purpose of processing-time watermark 
policy, it might be essentially ill defined from the beginning, thus 
it might really be better to remove it completely. There is also a 
related issue [2].


Any thoughts on this?

 Jan

[1] https://github.com/apache/beam/issues/25975

[2] https://github.com/apache/beam/issues/28760



Processing time watermarks in KinesisIO

2023-10-27 Thread Jan Lukavský

Hi,

when discussing about [1] we found out, that the issue is actually 
caused by processing time watermarks in KinesisIO. Enabling this 
watermark outputs watermarks based on current processing time, _but 
event timestamps are derived from ingestion timestamp_. This can cause 
unbounded lateness when processing backlog. I think this setup is 
error-prone and will likely cause data loss due to dropped elements. 
This can be solved in two ways:


 a) deprecate processing time watermarks, or

 b) modify KinesisIO's watermark policy so that is assigns event 
timestamps as well (the processing-time watermark policy would have to 
derive event timestamps from processing-time).


I'd prefer option b) , but it might be a breaking change, moreover I'm 
not sure if I understand the purpose of processing-time watermark 
policy, it might be essentially ill defined from the beginning, thus it 
might really be better to remove it completely. There is also a related 
issue [2].


Any thoughts on this?

 Jan

[1] https://github.com/apache/beam/issues/25975

[2] https://github.com/apache/beam/issues/28760


Re: Reshuffle PTransform Design Doc

2023-10-20 Thread Jan Lukavský
Yes, I'm aware that Beam is not defined in terms of runner convenience, 
but in terms of data transform primitives. On the other hand - looking 
at that from specific perspective - even though stateless shuffle does 
not change the data itself, it changes distribution of data with 
relation to partitioning, which is a property of the data as well. Not 
only the data itself, but also any metadata about it might be viewed as 
something that characterizes a PCollection and as something that can be 
manipulated. Hence a transform can be given a proper data-related 
semantics, even though it is a nop with regards to actual _contents_ of 
a PCollection. Having said that, my point was that if Redistribute was a 
defined fundamental primitive, it would immediately follow that it 
should not be implemented as GBK-unGBK (at least not with extra care), 
because it leads to problems with the stateful GBK introducing 
unexpected side-effects, which from my understanding was the initial 
problem that started this thread.


Best,

 Jan

On 10/19/23 20:26, Kenneth Knowles wrote:

Well I accidentally conflated "stateful" and "persisting", but anyhow
yea we aren't targeting to have one Beam primitive for each thing that
is probably a runner primitive.

On Thu, Oct 19, 2023 at 2:25 PM Kenneth Knowles  wrote:

On Fri, Oct 13, 2023 at 12:51 PM Jan Lukavský  wrote:

Hi,

I think there's been already said nearly everything in this thread, but ... it 
is time for Friday discussions. :)

Today I recalled of a discussion we've had long time ago, when we were 
designing Euphoria (btw, deprecating and removing it is still on my todo list, 
I should create a vote thread for that). We had 4 primitives:

  a) non-shuffle, stateless ~ stateless ParDo

  b) shuffle, stateful ~ stateful ParDo, with the ability (under the right circumstances, 
 i.e. defined event-time trigger, defined state merge function, ...) to be performed in a 
"combinable way".

  c) shuffle, stateless ~ Reshuffle

  d) non-shuffle, stateful - nope, makes no sense :) - part of the "combinable 
stateful shuffle operation"

  e) union ~ Flatten

Turns out you can build everything bottom up from these.

Now, the not-so-well defined semantics of Reshuffle (Redistribute) might arise 
from the fact it is not a primitive. Stateless shuffling of data is definitely 
a primitive of all runners.

Not Dataflow :-)

But more importantly, Beam primitives are deliberately chosen to be
fundamental data operations, not physical plan steps that a runner
might use. In other words, Beam is decidedly _not_ a library for
building composites that eventually are constructed from runner
primitives. It is more like SQL in that it is a library for building
composites that eventually are constructed from fundamental operations
on data, that every engine (like every RDBMS) will be able to
implement in its own way.

Kenn


Therefore here goes the question - should Redistribute be a primitive and not 
be built up from other transforms?

Best,

  Jan

On 10/6/23 21:12, Kenneth Knowles wrote:



On Fri, Oct 6, 2023 at 3:07 PM Jan Lukavský  wrote:


On 10/6/23 15:11, Kenneth Knowles wrote:



On Fri, Oct 6, 2023 at 3:20 AM Jan Lukavský  wrote:

Hi,

there is also one other thing to mention with relation to Reshuffle/RequiresStableinput 
and that is that our current implementation of RequiresStableInput can break without 
Reshuffle in some corner cases on most portable runners, at least with Java 
GreedyPipelineFuser, see [1]. The only way to workaround this currently is inserting 
Reshuffle (or any other fusion-break transform) directly before the stable DoFn 
(Reshuffle is handy, because it does not change the data). I think we should either 
somehow fix the issue [1] or include fusion break as a mandatory requirement for the new 
Redistribute transform as well (at least with some variant) or possibly add a new 
"hint" for non-optional fusion breaking.

This is actually the bug we have wanted to fix for years - redistribute has 
nothing to do with checkpointing or stable input and Reshuffle incorrectly 
merges the two concepts.

I agree that we couldn't make any immediate change that will break a runner. I 
believe runners that depend on Reshuffle to provide stable input will also 
provide stable input after GroupByKey. Since the SDK expansion of Reshuffle 
will still contains a GBK, those runners functionality will be unchanged.

I don't yet have a firm opinion between the these approaches:

1. Adjust the Java SDK implementation of Reshuffle (and maybe other SDKs if 
needed). With some flag so that users can use the old wrong behavior for update 
compatibility.
2. Add a Redistribute transform to the SDKs that has the right behavior and 
leave Reshuffle as it is.
1+2. Add the Redistribute transform but also make Reshuffle call it, so 
Reshuffle also gets the new behavior, with the same flag so that users can use 
the old wrong behavior for update 

Re: [YAML] Aggregations

2023-10-19 Thread Jan Lukavský

On 10/19/23 19:41, Robert Bradshaw via dev wrote:

On Thu, Oct 19, 2023 at 10:25 AM Jan Lukavský  wrote:

On 10/19/23 18:28, Robert Bradshaw via dev wrote:

On Thu, Oct 19, 2023 at 9:00 AM Byron Ellis  wrote:

Rill is definitely SQL-oriented but I think that's going to be the most common. 
Dataframes are explicitly modeled on the relational approach so that's going to 
look a lot like SQL,

I think pretty much any approach that fits here is going to be
relational, meaning you choose a set of columns to group on, a set of
columns to aggregate, and how to aggregate. The big open question is
what syntax to use for the "how."

This might be already answered, if so, pardon my ignorance, but what is
the goal this declarative approach is trying to solve? Is it meant to be
more expressive or equally expressive than SQL? And if more, how much more?

I'm not sure if you're asking about YAML in general, or the particular
case of aggregation, but I can answer both.

For the larger Beam YAML project, it's trying to solve the problem
that SQL is (and I'll admit this is somewhat subjective here) good at
expressing the T part of ETL, but not the other parts. For example,
the simple data movent usecase of (say) reading from PubSub and
dumping into BigQuery is not well expressed in terms of SQL. SQL is
also fairly awkward when it comes to defining UDFs and TDFs and
non-linear pipelines (especially those with fanout). There are of
course other tools in this space (dbt comes to mind, and there's been
some investigation on how to make dbt play well with Beam). The other
niche it is trying to solve is that installing and learning a full SDK
is heavyweight and overkill for creating pipelines that are simply
wiring together pre-defined transforms.


I think FlinkSQL solves the problem of E and L in SQL via CREATE TABLE 
and INSERT statements. I agree with the fanout part, though it could be 
possible to use CREATE (TEMPORARY) TABLE AS SELECT ... could solve that 
as well.



As for the more narrow case of aggregations, I think being similarly
expressive as SQL is fine, though it'd be good to make custom UADFs
more natural. Originally I was thinking that just having SqlTransform
might be sufficient, but it feels like a big hammer to reach for every
time I just want to sum over one or two columns.


Yes, defining UDFs and UDAFs is painful, that was the motivation of my 
question. It also defines how the syntax for such UDAF would need to 
look like. It would require to break UDAFs down to several primitive 
UDFs and then use a functional style to declare them. Most of the time 
it would be probably sufficient to use simplified CombineFn semantics 
with accumulator being limited to a primitive type (long, double, 
string, maybe array?). I suppose declaring a full-blown stateful DoFn 
(timers, generic state, ...) is out of scope.




Re: [YAML] Aggregations

2023-10-19 Thread Jan Lukavský

On 10/19/23 18:28, Robert Bradshaw via dev wrote:

On Thu, Oct 19, 2023 at 9:00 AM Byron Ellis  wrote:

Rill is definitely SQL-oriented but I think that's going to be the most common. 
Dataframes are explicitly modeled on the relational approach so that's going to 
look a lot like SQL,

I think pretty much any approach that fits here is going to be
relational, meaning you choose a set of columns to group on, a set of
columns to aggregate, and how to aggregate. The big open question is
what syntax to use for the "how."
This might be already answered, if so, pardon my ignorance, but what is 
the goal this declarative approach is trying to solve? Is it meant to be 
more expressive or equally expressive than SQL? And if more, how much more?


Dataframe aggregation is probably a good example to look at. Here we
have panda and R in particular as concrete instances. It should also
be easy to support different aggregations over different (or the same)
columns. Pandas can take a list of (or mapping to) functions in its
groupby().agg(). R doesn't seem to make this very easy...


which leaves us with S-style formulas (which I like but are pretty niche)

I'm curious, what are these?


  and I guess pivot tables coming from the spreadsheet world. Does make me 
wonder what Rails' ORM looks like these days (I last used v4), it had some 
aggregation support and was pretty declarative...

On Wed, Oct 18, 2023 at 6:06 PM Robert Bradshaw  wrote:

On Wed, Oct 18, 2023 at 5:06 PM Byron Ellis  wrote:

Is it worth taking a look at similar prior art in the space?

+1. Pointers welcome.


The first one that comes to mind is Transform, but with the dbt labs 
acquisition that spec is a lot harder to find. Rill is pretty similar though.

Rill seems to be very SQL-based.


On Wed, Oct 18, 2023 at 1:12 PM Robert Bradshaw via dev  
wrote:

Beam Yaml has good support for IOs and mappings, but one key missing
feature for even writing a WordCount is the ability to do Aggregations
[1]. While the traditional Beam primitive is GroupByKey (and
CombineValues), we're eschewing KVs in the notion of more schema'd
data (which has some precedence in our other languages, see the links
below). The key components the user needs to specify are (1) the key
fields on which the grouping will take place, (2) the fields
(expressions?) involved in the aggregation, and (3) what aggregating
fn to use.

A straw-man example could be something like

type: Aggregating
config:
   key: [field1, field2]
   aggregating:
 total_cost:
   fn: sum
   value: cost
 max_cost:
   fn: max
   value: cost

This would basically correspond to the SQL expression

"SELECT field1, field2, sum(cost) as total_cost, max(cost) as max_cost
from table GROUP BY field1, field2"

(though I'm not requiring that we use this as an implementation
strategy). I do not think we need a separate (non aggregating)
Grouping operation, this can be accomplished by having a concat-style
combiner.

There are still some open questions here, notably around how to
specify the aggregation fns themselves. We could of course provide a
number of built-ins (like SQL does). This gets into the question of
how and where to document this complete set, but some basics should
take us pretty far. Many aggregators, however, are parameterized (e.g.
quantiles); where do we put the parameters? We could go with something
like

fn:
   type: ApproximateQuantiles
   config:
 n: 10

but others are even configured by functions themselves (e.g. LargestN
that wants a comparator Fn). Maybe we decide not to support these
(yet?)

One thing I think we should support, however, is referencing custom
CombineFns. We have some precedent for this with our Fns from
MapToFields, where we accept things like inline lambdas and external
references. Again the topic of how to configure them comes up, as
these custom Fns are more likely to be parameterized than Map Fns
(though, to be clear, perhaps it'd be good to allow parameterizatin of
MapFns as well). Maybe we allow

language: python. # like MapToFields (and here it'd be harder to mix
and match per Fn)
fn:
   type: ???
   # should these be nested as config?
   name: fully.qualiied.name
   path: /path/to/defining/file
   args: [...]
   kwargs: {...}

which would invoke the constructor.

I'm also open to other ways of naming/structuring these essential
parameters if it makes things more clear.

- Robert


Java: 
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/schemas/transforms/Group.html
Python: 
https://beam.apache.org/documentation/transforms/python/aggregation/groupby
Typescript: 
https://beam.apache.org/releases/typedoc/current/classes/transforms_group_and_combine.GroupBy.html

[1] One can of course use SqlTransform for this, but I'm leaning
towards offering something more native.


Re: KafkaIO does not make use of Kafka Consumer Groups [kafka] [java] [io]

2023-10-18 Thread Jan Lukavský

Hi,

my two cents on this. While it would perfectly possible to use consumer 
group in KafkaIO, it has its own issues. The most visible would be, that 
using subscriptions might introduce unnecessary duplicates in downstream 
processing. The reason for this is that consumer in a consumer group 
might be reassigned partitions and/or being reset to a different offset 
based on conditions that are out of control of the consumer itself. This 
might lead to an inability to successfully commit offset of a bundle 
after it has been sent downstream, while the 
processed-but-not-yet-committed input element might be reprocessed by a 
different worker due to partition rebalance. This add unnecessary 
complexity with questionable benefits (observability of lag in a 
consumer group and possible automatic discovery of new partitions in a 
topic).


For these reasons I'd say, that it would be possible to introduce a 
different (e.g. KafkaConsumerGroupIO) IO, which could be added to Beam 
itself or (perhaps) some extension, but it makes little sense to 
introduce this into KafkaIO itself.


Hope this helps,

 Jan

On 10/18/23 05:49, Shaojie Wu wrote:

Can't agree with Shahar Frank more

On 2023/04/19 18:17:15 Shahar Frank wrote:

Hi Daniel,

I think I've already answered these in a previous email but let me answer
them again.

I was specifically responding to quoted points from your last email. I

really don't understand why you, as a user, care if the implementation of
the framework is using consumer groups or not as long as it has the
throughput you need and is correct. If there is something specific this
would be useful for, like monitoring or metrics, it seems a reasonable
feature request to me to ask to reflect the progress state in a kafka
consumer group, but not to use the underlying assignment mechanism for the
reasons stated above.


I do care for a couple of reasons:
1) Introducing risk with a technology that non one knows in the company vs.
a technology people know and trust (i.e. Kafka Consumer Groups)
2) A multitude of alerting, monitoring and other observability tools that
are using consumer groups will not be usable and new solutions would be
required
3) Expert knowhow on managing (and sometimes fixing) issues with Kafka in
the company will become useless - and this in turn introduces risk to
projects

If you want to run in a single machine application mode, you can try

setting the `flinkMaster` parameter to `[local]`, which should launch an
inline flink runner just for your pipeline. If you want to have a scaling
out cluster per-application, you can launch a repeatable flink cluster with
kubernetes on a per-application basis pretty easily.


I do agree that a Flink cluster is a great solution and have maintained a
few in my time.
Sadly in our use case I have to consider constraints set by security and
platform teams and that will take time.
By the time these follow through it is very likely that the decision to use
Beam would have fallen in favor of other solutions (i.e. Kafka Streams,
Camel and others) and this would be a shame in my view. It is very unlikely
that once taken this decision would be reversed for a long time.

Given that a Flink cluster is not an option for me at this point I have
been trying to push a solution where instances of a Beam pipeline are run
"disconnected" using a DIrectRunner or (better even) a FlinkRunner in local
standalone mode (as you're suggesting) - and like you suggested we are
running those using a K8s deployment which allows us to scale up a required.
The issue is if more than one pod attempts to run the pipeline - they will
not split the partitions between them but rather each would consume ALL
partitions and the output would include as many duplications as the number
of pods. This solution will then not be able to scale up horizontally.

That is exactly why I'm trying to suggest using consumer groups.
In this attempt I created - here

-  I've already shown it is possible (albeit I admit with limitations such
as you described) to use consumer groups and effectively allow our use case
to run on a scaled up K8s deployment of DirectRunners.

And again finally my question is why should Kafka be treated differently
from other messaging systems like SQS and PubSub for which it seems Beam
does not attempt to manage the distribution strategy as well the mechanism
for managing processed (committed) messages?

If Beam is able to perform as well with them managing these couldn't the
same be applied to Kafka?

Cheers,
Shahar.

--

Shahar Frank

srf...@gmail.com

+447799561438

--





On Wed, 19 Apr 2023 at 13:19, Daniel Collins  wrote:


Hello,

I was specifically responding to quoted points from your last email. I
really don't understand why you, as a user, care if the implementation of
the framework is using consumer groups or 

Re: [ANNOUNCE] New Committer: Sam Whittle

2023-10-17 Thread Jan Lukavský

Congrats Sam!

On 10/16/23 22:34, Austin Bennett wrote:

Thanks, Sam!

On Mon, Oct 16, 2023 at 12:39 PM XQ Hu via dev  
wrote:


Congratulations!

On Mon, Oct 16, 2023 at 1:58 PM Ahmet Altay via dev
 wrote:

Congratulations Sam!

On Mon, Oct 16, 2023 at 10:42 AM Byron Ellis via dev
 wrote:

Congrats Sam!

On Mon, Oct 16, 2023 at 10:32 AM Chamikara Jayalath via
dev  wrote:

Congrats Sam!

On Mon, Oct 16, 2023 at 9:32 AM Kenneth Knowles
 wrote:

Hi all,

Please join me and the rest of the Beam PMC in
welcoming a new
committer: Sam Whittle (scwhit...@apache.org).

Sam has been contributing to Beam since 2016! In
particular, he specializes in streaming and the
Dataflow Java worker but his contributions expand
naturally from there to the Java SDK, IOs, and
even a bit of Python :-). Sam has contributed a
ton of code over the years and is generous in code
review and sharing his expertise.

Considering his contributions to the project over
this timeframe, the
Beam PMC trusts Sam with the responsibilities of a
Beam committer. [1]

Thank you Sam! And we are looking to see more of
your contributions!

Kenn, on behalf of the Apache Beam PMC

[1]

https://beam.apache.org/contribute/become-a-committer/#an-apache-beam-committer


Re: [ANNOUNCE] New Committer: Byron Ellis

2023-10-17 Thread Jan Lukavský

Congrats Byron!

On 10/16/23 22:33, Austin Bennett wrote:

thanks, Byron!

On Mon, Oct 16, 2023 at 12:38 PM XQ Hu via dev  
wrote:


Congratulations!

On Mon, Oct 16, 2023 at 1:58 PM Ahmet Altay via dev
 wrote:

Congratulations Byron!

On Mon, Oct 16, 2023 at 10:35 AM Tomo Suzuki via dev
 wrote:

Congratulations!


On Mon, Oct 16, 2023 at 1:33 PM Chamikara Jayalath via dev
 wrote:

Congrats Byron!

On Mon, Oct 16, 2023 at 9:32 AM Kenneth Knowles
 wrote:

Hi all,

Please join me and the rest of the Beam PMC in
welcoming a new
committer: Byron Ellis (b...@apache.org).

Byron has been with Beam for over a year now. You
may all know him as the guy who just decided to
write a Swift SDK :-). In addition to that big
contribution Byron has also fixed plenty of bugs,
prototyped DBT-tyle pipeline authoring, and
participated in our collective decision-making
process.

Considering his contributions to the project over
this timeframe, the
Beam PMC trusts Byron with the responsibilities of
a Beam committer. [1]

Thank you Byron! And we are looking to see more of
your contributions!

Kenn, on behalf of the Apache Beam PMC

[1]

https://beam.apache.org/contribute/become-a-committer/#an-apache-beam-committer



-- 
Regards,

Tomo


Re: [DISCUSS] Drop Euphoria extension

2023-10-16 Thread Jan Lukavský
Sure, that would be probably the preferred way to go. For now, I'm 
trying to get some feedback, if there are some real-world users who 
might miss the API. Currently, the only value I see is that Euphoria 
adds an additional level of indirection for user code. The expansion 
goes like this:


 Euphoria Pipeline -> runtime provided translators -> vanilla Beam 
Pipeline -> runner


Hence code written using Euphoria extension can be modified at runtime 
(Pipeline construction time) using dependency injection, which brings 
the value that users can modify (typically optimize) Pipelines without 
actually modifying the business logic. On the other hand I'm not sure if 
this justifies the complexity of the extension. Were this the only 
value, it should be possible to implement such dynamic expansion either 
into Java SDK core or as a different light-weight extension.


 Jan

On 10/16/23 15:10, Alexey Romanenko wrote:

Can we just deprecate it for a while and then remove completely?

—
Alexey


On 13 Oct 2023, at 18:59, Jan Lukavský  wrote:

Hi,

it has been some time since Euphoria extension [1] has been adopted by Beam as a possible 
"Java 8 API". Beam has evolved from that time a lot, the current API seems 
actually more elegant than the original Euphoria's and last but not least, it has no 
maintainers and no known users. If there are any users, please speak up!

Otherwise I'd like to propose to drop it from codebase, I'll start a vote 
thread during next week, if there are no objections.

Best,

  Jan

[1] https://beam.apache.org/documentation/sdks/java/euphoria/



[DISCUSS] Drop Euphoria extension

2023-10-13 Thread Jan Lukavský

Hi,

it has been some time since Euphoria extension [1] has been adopted by 
Beam as a possible "Java 8 API". Beam has evolved from that time a lot, 
the current API seems actually more elegant than the original Euphoria's 
and last but not least, it has no maintainers and no known users. If 
there are any users, please speak up!


Otherwise I'd like to propose to drop it from codebase, I'll start a 
vote thread during next week, if there are no objections.


Best,

 Jan

[1] https://beam.apache.org/documentation/sdks/java/euphoria/



Re: Reshuffle PTransform Design Doc

2023-10-13 Thread Jan Lukavský

Hi,

I think there's been already said nearly everything in this thread, but 
... it is time for Friday discussions. :)


Today I recalled of a discussion we've had long time ago, when we were 
designing Euphoria (btw, deprecating and removing it is still on my todo 
list, I should create a vote thread for that). We had 4 primitives:


 a) non-shuffle, stateless ~ stateless ParDo

 b) shuffle, stateful ~ stateful ParDo, with the ability (under the 
right circumstances,  i.e. defined event-time trigger, defined state 
merge function, ...) to be performed in a "combinable way".


 c) shuffle, stateless ~ Reshuffle

 d) non-shuffle, stateful - nope, makes no sense :) - part of the 
"combinable stateful shuffle operation"


 e) union ~ Flatten

Turns out you can build everything bottom up from these.

Now, the not-so-well defined semantics of Reshuffle (Redistribute) might 
arise from the fact it is not a primitive. Stateless shuffling of data 
is definitely a primitive of all runners.


Therefore here goes the question - should Redistribute be a primitive 
and not be built up from other transforms?


Best,

 Jan

On 10/6/23 21:12, Kenneth Knowles wrote:



On Fri, Oct 6, 2023 at 3:07 PM Jan Lukavský  wrote:


On 10/6/23 15:11, Kenneth Knowles wrote:



On Fri, Oct 6, 2023 at 3:20 AM Jan Lukavský  wrote:

Hi,

there is also one other thing to mention with relation to
Reshuffle/RequiresStableinput and that is that our current
implementation of RequiresStableInput can break without
Reshuffle in some corner cases on most portable runners, at
least with Java GreedyPipelineFuser, see [1]. The only way to
workaround this currently is inserting Reshuffle (or any
other fusion-break transform) directly before the stable DoFn
(Reshuffle is handy, because it does not change the data). I
think we should either somehow fix the issue [1] or include
fusion break as a mandatory requirement for the new
Redistribute transform as well (at least with some variant)
or possibly add a new "hint" for non-optional fusion breaking.

This is actually the bug we have wanted to fix for years -
redistribute has nothing to do with checkpointing or stable input
and Reshuffle incorrectly merges the two concepts.

I agree that we couldn't make any immediate change that will
break a runner. I believe runners that depend on Reshuffle to
provide stable input will also provide stable input after
GroupByKey. Since the SDK expansion of Reshuffle will still
contains a GBK, those runners functionality will be unchanged.

I don't yet have a firm opinion between the these approaches:

1. Adjust the Java SDK implementation of Reshuffle (and maybe
other SDKs if needed). With some flag so that users can use the
old wrong behavior for update compatibility.
2. Add a Redistribute transform to the SDKs that has the right
behavior and leave Reshuffle as it is.
1+2. Add the Redistribute transform but also make Reshuffle call
it, so Reshuffle also gets the new behavior, with the same flag
so that users can use the old wrong behavior for update
compatibility.

All of these will leave "Reshuffle for RequestStableInput" alone
for now. The options that include (2) will move us a little
closer to migrating to a "better" future state.


I might have not expressed the right way. I understand that
Reshuffle having "stable input" functionality is non-portable
side-effect. It would be nice to get rid of it and my impression
from this thread was that we would try to deprecate Reshuffle and
introduce Redistribute which will not have such semantics. All of
this is fine, problem is that we currently (is some corner cases)
rely on Reshuffle *even though* Pipeline uses
@RequiresStableInput. That is due to the fact that Reshuffle also
ensures fusion breaking.  Fusing non-deterministic DoFn with
stable DoFn breaks the stable input property, because runners can
ensure stability only at the input of executable stage. Therefore
we would either need to:

 a) define Redistribute as being an unconditional fusion break
boundary, or

 b) define some other transform or hint to be able to enforce
fusion breaking

Otherwise I'd be in favor of 2 and deprecation of Reshuffle.


Just to be very clear - my goal right now is to just give Reshuffle a 
consistent semantics. Even for the old "stable input + redistribute" 
use of Reshuffle, the semantics are inconsistent/undefined and the 
Java SDK expansion is wrong. Changing things having to do with stable 
input are not part of what I am trying to change right now. But it is 
fine to do things that prepare for that.


Kenn

 Jan



Any votes? Any other options?

Kenn

 Jan

[1] https:/

Re: Reshuffle PTransform Design Doc

2023-10-06 Thread Jan Lukavský


On 10/6/23 15:11, Kenneth Knowles wrote:



On Fri, Oct 6, 2023 at 3:20 AM Jan Lukavský  wrote:

Hi,

there is also one other thing to mention with relation to
Reshuffle/RequiresStableinput and that is that our current
implementation of RequiresStableInput can break without Reshuffle
in some corner cases on most portable runners, at least with Java
GreedyPipelineFuser, see [1]. The only way to workaround this
currently is inserting Reshuffle (or any other fusion-break
transform) directly before the stable DoFn (Reshuffle is handy,
because it does not change the data). I think we should either
somehow fix the issue [1] or include fusion break as a mandatory
requirement for the new Redistribute transform as well (at least
with some variant) or possibly add a new "hint" for non-optional
fusion breaking.

This is actually the bug we have wanted to fix for years - 
redistribute has nothing to do with checkpointing or stable input and 
Reshuffle incorrectly merges the two concepts.


I agree that we couldn't make any immediate change that will break a 
runner. I believe runners that depend on Reshuffle to provide stable 
input will also provide stable input after GroupByKey. Since the SDK 
expansion of Reshuffle will still contains a GBK, those runners 
functionality will be unchanged.


I don't yet have a firm opinion between the these approaches:

1. Adjust the Java SDK implementation of Reshuffle (and maybe other 
SDKs if needed). With some flag so that users can use the old wrong 
behavior for update compatibility.
2. Add a Redistribute transform to the SDKs that has the right 
behavior and leave Reshuffle as it is.
1+2. Add the Redistribute transform but also make Reshuffle call it, 
so Reshuffle also gets the new behavior, with the same flag so that 
users can use the old wrong behavior for update compatibility.


All of these will leave "Reshuffle for RequestStableInput" alone for 
now. The options that include (2) will move us a little closer to 
migrating to a "better" future state.


I might have not expressed the right way. I understand that Reshuffle 
having "stable input" functionality is non-portable side-effect. It 
would be nice to get rid of it and my impression from this thread was 
that we would try to deprecate Reshuffle and introduce Redistribute 
which will not have such semantics. All of this is fine, problem is that 
we currently (is some corner cases) rely on Reshuffle *even though* 
Pipeline uses @RequiresStableInput. That is due to the fact that 
Reshuffle also ensures fusion breaking.  Fusing non-deterministic DoFn 
with stable DoFn breaks the stable input property, because runners can 
ensure stability only at the input of executable stage. Therefore we 
would either need to:


 a) define Redistribute as being an unconditional fusion break boundary, or

 b) define some other transform or hint to be able to enforce fusion 
breaking


Otherwise I'd be in favor of 2 and deprecation of Reshuffle.

 Jan



Any votes? Any other options?

Kenn

 Jan

[1] https://github.com/apache/beam/issues/24655

On 10/5/23 21:01, Robert Burke wrote:

Reshuffle/redistribute being a transform has the benefit of
allowing existing runners that aren't updated to be aware of the
new urns to rely on an SDK side implementation, which may be more
expensive than what the runner is able to do with that awareness.

Aka: it gives purpose to the fallback implementations.

On Thu, Oct 5, 2023, 9:03 AM Kenneth Knowles  wrote:

Another perspective, ignoring runners custom implementations
and non-Java SDKs could be that the semantics are perfectly
well defined: it is a composite and its semantics are defined
by its implementation in terms of primitives. It is just that
this expansion is not what we want so we should not use it
(and also we shouldn't use "whatever the implementation does"
as a spec for anything we care about).

On Thu, Oct 5, 2023 at 11:56 AM Kenneth Knowles
 wrote:

I totally agree. I am motivated right now by the fact
that it is already used all over the place but with no
consistent semantics. Maybe it is simpler to focus on
just making the minimal change, which would basically be
to update the expansion of the Reshuffle in the Java SDK.

Kenn

On Thu, Oct 5, 2023 at 11:39 AM John Casey
 wrote:

Given that this is a hint, I'm not sure redistribute
should be a PTransform as opposed to some other way
to hint to a runner.

I'm not sure of what the syntax of that would be, but
a semantic no-op transform that the runner may or may
not do anything with is odd.

On Thu, Oct 5, 2023 

Re: Reshuffle PTransform Design Doc

2023-10-06 Thread Jan Lukavský

Hi,

there is also one other thing to mention with relation to 
Reshuffle/RequiresStableinput and that is that our current 
implementation of RequiresStableInput can break without Reshuffle in 
some corner cases on most portable runners, at least with Java 
GreedyPipelineFuser, see [1]. The only way to workaround this currently 
is inserting Reshuffle (or any other fusion-break transform) directly 
before the stable DoFn (Reshuffle is handy, because it does not change 
the data). I think we should either somehow fix the issue [1] or include 
fusion break as a mandatory requirement for the new Redistribute 
transform as well (at least with some variant) or possibly add a new 
"hint" for non-optional fusion breaking.


 Jan

[1] https://github.com/apache/beam/issues/24655

On 10/5/23 21:01, Robert Burke wrote:
Reshuffle/redistribute being a transform has the benefit of allowing 
existing runners that aren't updated to be aware of the new urns to 
rely on an SDK side implementation, which may be more expensive than 
what the runner is able to do with that awareness.


Aka: it gives purpose to the fallback implementations.

On Thu, Oct 5, 2023, 9:03 AM Kenneth Knowles  wrote:

Another perspective, ignoring runners custom implementations and
non-Java SDKs could be that the semantics are perfectly well
defined: it is a composite and its semantics are defined by its
implementation in terms of primitives. It is just that this
expansion is not what we want so we should not use it (and also we
shouldn't use "whatever the implementation does" as a spec for
anything we care about).

On Thu, Oct 5, 2023 at 11:56 AM Kenneth Knowles 
wrote:

I totally agree. I am motivated right now by the fact that it
is already used all over the place but with no consistent
semantics. Maybe it is simpler to focus on just making the
minimal change, which would basically be to update the
expansion of the Reshuffle in the Java SDK.

Kenn

On Thu, Oct 5, 2023 at 11:39 AM John Casey
 wrote:

Given that this is a hint, I'm not sure redistribute
should be a PTransform as opposed to some other way to
hint to a runner.

I'm not sure of what the syntax of that would be, but a
semantic no-op transform that the runner may or may not do
anything with is odd.

On Thu, Oct 5, 2023 at 11:30 AM Kenneth Knowles
 wrote:

So a high level suggestion from Robert that I want to
highlight as a top-post:

Instead of focusing on just fixing the SDKs and
runners Reshuffle, this could be an opportunity to
introduce Redistribute which was proposed in the
long-ago thread. The semantics are identical but it is
more clear that it /only/ is a hint about
redistributing data and there is no expectation of a
checkpoint.

This new name may also be an opportunity to maintain
update compatibility (though this may actually be
leaving unsafe code in user's hands) and/or
separate @RequiresStableInput/checkpointing uses of
Reshuffle from redistribution-only uses of Reshuffle.

Any other thoughts on this one high level bit?

Kenn

On Thu, Oct 5, 2023 at 11:15 AM Kenneth Knowles
 wrote:


On Wed, Oct 4, 2023 at 7:45 PM Robert Burke
 wrote:

LGTM.

It looks the Go SDK already adheres to these
semantics as well for the reference impl(well,
reshuffle/redistribute_randomly, _by_key isn't
implemented in the Go SDK, and only uses the
existing unqualified reshuffle URN [0].

The original strategy, and then for every
element, the original Window, TS, and Pane are
all serialized, shuffled, and then
deserialized downstream.


https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go#L65


https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go#L145

Prism at the moment vaccuously implements
reshuffle by omitting the node, and rewriting
the inputs and outputs [1], as it's a local
runner with single transform per bundle
execution, but I was intending to make it a
fusion break regardless.  Ultimately prism's
"test" variant will default to executing the
   

Re: [VOTE] Release 2.51.0, release candidate #1

2023-10-05 Thread Jan Lukavský

+1 (binding)

Tested Java SDK with Flink Runner on own test-cases.

 Jan

On 10/4/23 21:10, Bruno Volpato via dev wrote:

+1 (non-binding).

Tested with https://github.com/GoogleCloudPlatform/DataflowTemplates 
(Java SDK 11, Dataflow Runner using both legacy and v2).


Thanks Kenn!

On Wed, Oct 4, 2023 at 3:03 PM Robert Bradshaw via dev 
 wrote:


+1 (binding)

Verified artifacts and signatures and tested a simple
python pipeline in a fresh environment with a wheel.

On Wed, Oct 4, 2023 at 8:05 AM Ritesh Ghorse via dev
 wrote:

+1 (non-binding) validated Go SDK quickstart and Python
Streaming quickstart on Dataflow runner.

Thanks!

On Tue, Oct 3, 2023 at 5:40 PM XQ Hu via dev
 wrote:

+1 (non-binding). Tested the simple dataflow ML starter
job with

https://github.com/google/dataflow-ml-starter/actions/runs/6397130175/job/17364408813.

On Tue, Oct 3, 2023 at 2:29 PM Danny McCormick via dev
 wrote:

All Beam Python versions 2.50 and greater run
exclusively on Dataflow runner v2, so we don't need to
test v1 anymore. I'll delete those rows from the
spreadsheet

On Tue, Oct 3, 2023 at 2:25 PM Svetak Sundhar
 wrote:

+1 Non Binding

Tested Python Direct Runner and Dataflow Runner as
well.

On the spreadsheet, I came across "Dataflow v1
(until 2.49.0, inclusive)", and do not fully
understand what this means.

Does this mean
(1) we shouldn't be testing on Dataflow runner v1
for releases after 2.49 or
(2) make sure we test on runner v1 for this release?

Thanks in advance for the clarification,
*
*
*
*




Svetak Sundhar

  Data Engineer

s
vetaksund...@google.com



On Tue, Oct 3, 2023 at 2:14 PM Danny McCormick via
dev  wrote:

+1 (non-binding)

Tested python/ML execution with

https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_huggingface.ipynb
(interactive runner) and

https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/automatic_model_refresh.ipynb
(Dataflow runner).

Thanks,
Danny

On Tue, Oct 3, 2023 at 1:58 PM Kenneth Knowles
 wrote:

Hi everyone,

Please review and vote on the release
candidate #1 for the version 2.51.0, as
follows:

[ ] +1, Approve the release
[ ] -1, Do not approve the release (please
provide specific comments)

Reviewers are encouraged to test their own
use cases with the release candidate, and
vote +1 if no issues are found. Only PMC
member votes will count towards the final
vote, but votes from all community members
is encouraged and helpful for finding
regressions; you can either test your own
use cases or use cases from the validation
sheet [10].

The complete staging area is available for
your review, which includes:

  * GitHub Release notes [1],
  * the official Apache source release to
be deployed to dist.apache.org
 [2], which is
signed with the key with fingerprint
 [3],
  * all artifacts to be deployed to the
Maven Central Repository [4],
  * source code tag "v1.2.3-RC3" [5],
  * website pull request listing the
release [6], the blog post [6], and
publishing the API reference manual [7].
  * Java artifacts were built with Gradle
GRADLE_VERSION 

Re: [ANNOUNCE] New PMC Member: Robert Burke

2023-10-04 Thread Jan Lukavský

Congrats Robert!

On 10/4/23 10:29, Alexey Romanenko wrote:

Congrats Robert, very well deserved!

—
Alexey


On 4 Oct 2023, at 00:39, Austin Bennett  wrote:

Thanks for all you do @Robert Burke  !

On Tue, Oct 3, 2023 at 12:53 PM Ahmed Abualsaud 
 wrote:


Congrats Rebo!

On 2023/10/03 18:39:47 Kenneth Knowles wrote:
> Hi all,
>
> Please join me and the rest of the Beam PMC in welcoming Robert
Burke <
> lostl...@apache.org> as our newest PMC member.
>
> Robert has been a part of the Beam community since 2017. He is
our resident
> Gopher, producing the Go SDK and most recently the local,
portable, Prism
> runner. Robert has presented on Beam many times, having written
not just
> core Beam code but quite interesting pipelines too :-)
>
> Congratulations Robert and thanks for being a part of Apache Beam!
>
> Kenn, on behalf of the Beam PMC (which now includes Robert)
>



Re: [ANNOUNCE] New PMC Member: Valentyn Tymofieiev

2023-10-04 Thread Jan Lukavský

Congrats Valentyn!

On 10/4/23 10:26, Alexey Romanenko wrote:

Congrats Valentyn, very well deserved!

—
Alexey


On 4 Oct 2023, at 00:39, Austin Bennett  wrote:

Thanks for everything @Valentyn Tymofieiev 
 !


On Tue, Oct 3, 2023 at 12:53 PM Ahmed Abualsaud 
 wrote:


Congrats Valentyn!

On 2023/10/03 18:39:49 Kenneth Knowles wrote:
> Hi all,
>
> Please join me and the rest of the Beam PMC in welcoming Valentyn
> Tymofieiev  as our newest PMC member.
>
> Valentyn has been contributing to Beam since 2017. Notable
highlights
> include his work on the Python SDK and also in our container
management.
> Valentyn also is involved in many discussions around Beam's
infrastructure
> and community processes. If you look through Valentyn's
history, you will
> see an abundance of the most critical maintenance work that is
the beating
> heart of any project.
>
> Congratulations Valentyn and thanks for being a part of Apache
Beam!
>
> Kenn, on behalf of the Beam PMC (which now includes Valentyn)
>



Re: [ANNOUNCE] New PMC Member: Alex Van Boxel

2023-10-04 Thread Jan Lukavský

Congrats Alex!

On 10/4/23 10:29, Alexey Romanenko wrote:

Congrats Alex, very well deserved!

—
Alexey


On 4 Oct 2023, at 00:38, Austin Bennett  wrote:

Thanks for all you do, @Alex Van Boxel  !

On Tue, Oct 3, 2023 at 12:50 PM Ahmed Abualsaud via dev 
 wrote:


Congratulations!

On Tue, Oct 3, 2023 at 3:48 PM Byron Ellis via dev
 wrote:

Congrats!

On Tue, Oct 3, 2023 at 12:40 PM Danielle Syse via dev
 wrote:

Congratulations Alex!! Definitely well deserved!

On Tue, Oct 3, 2023 at 2:57 PM Ahmet Altay via dev
 wrote:

Congratulations Alex! Well deserved!

On Tue, Oct 3, 2023 at 11:54 AM Ritesh Ghorse via dev
 wrote:

Congratulations Alex!

On Tue, Oct 3, 2023 at 2:54 PM Danny McCormick
via dev  wrote:

Congrats Alex, this is well deserved!

On Tue, Oct 3, 2023 at 2:50 PM Jack McCluskey
via dev  wrote:

Congrats, Alex!

On Tue, Oct 3, 2023 at 2:49 PM XQ Hu via
dev  wrote:

Configurations, Alex!

On Tue, Oct 3, 2023 at 2:40 PM
Kenneth Knowles  wrote:

Hi all,

Please join me and the rest of
the Beam PMC in welcoming Alex
Van Boxel
 as our
newest PMC member.

Alex has been with Beam since
2016, very early in the life of
the project. Alex has
contributed code, design ideas,
and perhaps most importantly been
a huge part of organizing Beam
Summits, and of course presenting
at them as well. Alex really
brings the ASF community spirit
to Beam.

Congratulations Alex and thanks
for being a part of Apache Beam!

Kenn, on behalf of the Beam PMC
(which now includes Alex)



Re: Runner Bundling Strategies

2023-09-27 Thread Jan Lukavský
Understood, thanks. This is fairly unintuitive from the "checkpoint 
barrier" viewpoint, because when such runner fails, it simply restarts 
from the checkpoint as it would be a fresh start - i.e. calling Setup. 
It makes sense that a bundle-based runner might not do that.


It seems to follow that we cannot infer any optimizations purely from 
static analysis of the DoFn, should we consider adding an opt-out 
parameter for the bundle atomicity (which has also implications) and 
bundle in-flight element watermark hold? I'd say yes, because otherwise 
we might restrict some runners too much.


On 9/27/23 20:24, Reuven Lax via dev wrote:
Using Setup would cause data loss in this case. A runner can always 
retry a bundle, and I don't believe Setup is called again in this 
case. If the user initiated the hashmap in setup, this would cause 
records to be completely lost whenever bundles retry.


On Wed, Sep 27, 2023 at 11:20 AM Jan Lukavský  wrote:

What is the reason to rely on StartBundle and not Setup in this
case? If the life-cycle of bundle is not "closed" (i.e. start -
finish), then it seems to be ill defined and Setup should do?
I'm trying to think of non-caching use-cases of
StartBundle-FinishBundle, are there such cases? I'd say yes, but
I'm a little struggling finding a specific example that cannot be
solved using Setup or lazy init.

On 9/27/23 19:58, Reuven Lax via dev wrote:

DoFns are allowed to be non deterministic, so they don't have to
yield the "same" output.

The example I'm thinking of is where users perform some
"best-effort" deduplication by creating a hashmap in StartBundle
and removing duplicates. This is usually done purely for
performance to reduce shuffle size, as opposed to a guaranteed
RemoveDuplicates. This scenario doesn't require FinishBundle,
though it does require a StartBundle.

On Tue, Sep 26, 2023 at 11:59 AM Kenneth Knowles
 wrote:



On Tue, Sep 26, 2023 at 1:33 PM Reuven Lax via dev
 wrote:

Yes, not including FinishBundle in ParDoPayload seems
like a mistake. Though absence of FinishBundle doesn't
mean that one can assume that elements in a bundle don't
affect subsequent bundle elements (i.e. there might still
be caching!)


Well for a DoFn to be correct, it has to yield the same (or
"the same as much as the user expects it to be the same")
output regardless of order of processing or bundling so a
runner or SDK harness can definitely take a bunch of elements
and process them however it wants if there's
no @FinishBundle. I think that's what Jan is getting at -
adding a @FinishBundle is the user placing a new restriction
on the runner. Technically probably have to
include @StartBundle in that consideration.

Kenn


On Tue, Sep 26, 2023 at 8:54 AM Kenneth Knowles
 wrote:



    On Mon, Sep 25, 2023 at 1:19 PM Jan Lukavský
 wrote:

Hi Kenn and Reuven,

I agree with all these points. The only issue
here seems to be that FlinkRunner does not
fulfill these constraints. This is a bug that can
be fixed, though we need to change some defaults,
as 1000 ms default bundle "duration" for lower
traffic Pipelines can be too much. We are also
probably missing some @ValidatesReunner tests for
this. I created [1] and [2] to track this.

One question still remains, the bundle vs.
element life-cycle is relevant only for cases
where processing of element X can affect
processing of element Y later in the same bundle.
Once this influence is rules out (i.e. no
caching), this information can result in runner
optimization that yields better performance.
Should we consider propagate this information
from user code to the runner?

Yes!

This was the explicit goal of the move to
annotation-driven DoFn in
https://s.apache.org/a-new-dofn to make it so that
the SDK and runner can get good information about
what the DoFn requirements are.

When there is no @FinishBundle method, the runner can
make additional optimizations. This should have been
included in the ParDoPayload in the proto when we
moved to portable pipelines. I cannot remember if
there was a good reason that we did not do so. Maybe
we (incorrectly) thought that this

Re: Runner Bundling Strategies

2023-09-27 Thread Jan Lukavský
What is the reason to rely on StartBundle and not Setup in this case? If 
the life-cycle of bundle is not "closed" (i.e. start - finish), then it 
seems to be ill defined and Setup should do?
I'm trying to think of non-caching use-cases of 
StartBundle-FinishBundle, are there such cases? I'd say yes, but I'm a 
little struggling finding a specific example that cannot be solved using 
Setup or lazy init.


On 9/27/23 19:58, Reuven Lax via dev wrote:
DoFns are allowed to be non deterministic, so they don't have to yield 
the "same" output.


The example I'm thinking of is where users perform some "best-effort" 
deduplication by creating a hashmap in StartBundle and removing 
duplicates. This is usually done purely for performance to reduce 
shuffle size, as opposed to a guaranteed RemoveDuplicates. This 
scenario doesn't require FinishBundle, though it does require a 
StartBundle.


On Tue, Sep 26, 2023 at 11:59 AM Kenneth Knowles  wrote:



On Tue, Sep 26, 2023 at 1:33 PM Reuven Lax via dev
 wrote:

Yes, not including FinishBundle in ParDoPayload seems like a
mistake. Though absence of FinishBundle doesn't mean that one
can assume that elements in a bundle don't affect subsequent
bundle elements (i.e. there might still be caching!)


Well for a DoFn to be correct, it has to yield the same (or "the
same as much as the user expects it to be the same") output
regardless of order of processing or bundling so a runner or SDK
harness can definitely take a bunch of elements and process them
however it wants if there's no @FinishBundle. I think that's what
Jan is getting at - adding a @FinishBundle is the user placing a
new restriction on the runner. Technically probably have to
include @StartBundle in that consideration.

Kenn


On Tue, Sep 26, 2023 at 8:54 AM Kenneth Knowles
 wrote:



    On Mon, Sep 25, 2023 at 1:19 PM Jan Lukavský
 wrote:

Hi Kenn and Reuven,

I agree with all these points. The only issue here
seems to be that FlinkRunner does not fulfill these
constraints. This is a bug that can be fixed, though
we need to change some defaults, as 1000 ms default
bundle "duration" for lower traffic Pipelines can be
too much. We are also probably missing some
@ValidatesReunner tests for this. I created [1] and
[2] to track this.

One question still remains, the bundle vs. element
life-cycle is relevant only for cases where processing
of element X can affect processing of element Y later
in the same bundle. Once this influence is rules out
(i.e. no caching), this information can result in
runner optimization that yields better performance.
Should we consider propagate this information from
user code to the runner?

Yes!

This was the explicit goal of the move to
annotation-driven DoFn in https://s.apache.org/a-new-dofn
to make it so that the SDK and runner can get good
information about what the DoFn requirements are.

When there is no @FinishBundle method, the runner can make
additional optimizations. This should have been included
in the ParDoPayload in the proto when we moved to portable
pipelines. I cannot remember if there was a good reason
that we did not do so. Maybe we (incorrectly) thought that
this was an issue that only the Java SDK harness needed to
know about.

Kenn

[1] https://github.com/apache/beam/issues/28649

[2] https://github.com/apache/beam/issues/28650

On 9/25/23 18:31, Reuven Lax via dev wrote:



    On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský
 wrote:


On 9/23/23 18:16, Reuven Lax via dev wrote:

Two separate things here:

1. Yes, a watermark can update in the middle of
a bundle.
2. The records in the bundle themselves will
prevent the watermark from updating as they are
still in flight until after finish bundle.
Therefore simply caching the records should
always be watermark safe, regardless of the
runner. You will only run into problems if you
try and move timestamps "backwards" - which is
why Beam strongly discourages this.

This is not aligned with FlinkRunner's
implementation. And I actually think it is not
   

Re: Runner Bundling Strategies

2023-09-25 Thread Jan Lukavský

Hi Kenn and Reuven,

I agree with all these points. The only issue here seems to be that 
FlinkRunner does not fulfill these constraints. This is a bug that can 
be fixed, though we need to change some defaults, as 1000 ms default 
bundle "duration" for lower traffic Pipelines can be too much. We are 
also probably missing some @ValidatesReunner tests for this. I created 
[1] and [2] to track this.


One question still remains, the bundle vs. element life-cycle is 
relevant only for cases where processing of element X can affect 
processing of element Y later in the same bundle. Once this influence is 
rules out (i.e. no caching), this information can result in runner 
optimization that yields better performance. Should we consider 
propagate this information from user code to the runner?


[1] https://github.com/apache/beam/issues/28649

[2] https://github.com/apache/beam/issues/28650

On 9/25/23 18:31, Reuven Lax via dev wrote:



On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský  wrote:


On 9/23/23 18:16, Reuven Lax via dev wrote:

Two separate things here:

1. Yes, a watermark can update in the middle of a bundle.
2. The records in the bundle themselves will prevent the
watermark from updating as they are still in flight until after
finish bundle. Therefore simply caching the records should always
be watermark safe, regardless of the runner. You will only run
into problems if you try and move timestamps "backwards" - which
is why Beam strongly discourages this.

This is not aligned with  FlinkRunner's implementation. And I
actually think it is not aligned conceptually.  As mentioned,
Flink does not have the concept of bundles at all. It achieves
fault tolerance via checkpointing, essentially checkpoint barrier
flowing from sources to sinks, safely snapshotting state of each
operator on the way. Bundles are implemented as a somewhat
arbitrary set of elements between two consecutive checkpoints
(there can be multiple bundles between checkpoints). A bundle is
'committed' (i.e. persistently stored and guaranteed not to retry)
only after the checkpoint barrier passes over the elements in the
bundle (every bundle is finished at the very latest exactly before
a checkpoint). But watermark propagation and bundle finalization
is completely unrelated. This might be a bug in the runner, but
requiring checkpoint for watermark propagation will introduce
insane delays between processing time and watermarks, every
executable stage will delay watermark propagation until a
checkpoint (which is typically the order of seconds). This delay
would add up after each stage.


It's not bundles that hold up processing, rather it is elements, and 
elements are not considered "processed" until FinishBundle.


You are right about Flink. In many cases this is fine - if Flink rolls 
back to the last checkpoint, the watermark will also roll back, and 
everything stays consistent. So in general, one does not need to wait 
for checkpoints for watermark propagation.


Where things get a bit weirder with Flink is whenever one has external 
side effects. In theory, one should wait for checkpoints before 
letting a Sink flush, otherwise one could end up with incorrect 
outputs (especially with a sink like TextIO). Flink itself recognizes 
this, and that's why they provide TwoPhaseCommitSinkFunction 
<https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html> which 
waits for a checkpoint. In Beam, this is the reason we introduced 
RequiresStableInput. Of course in practice many Flink users don't do 
this - in which case they are prioritizing latency over data correctness.




Reuven

    On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský 
wrote:

> Watermarks shouldn't be (visibly) advanced until
@FinishBundle is committed, as there's no guarantee that this
work won't be discarded.

There was a thread [1], where the conclusion seemed to be
that updating watermark is possible even in the middle of a
bundle. Actually, handling watermarks is runner-dependent
(e.g. Flink does not store watermarks in checkpoints, they
are always recomputed from scratch on restore).

[1]
https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv

On 9/22/23 21:47, Robert Bradshaw via dev wrote:

        On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský
 wrote:

On 9/22/23 18:07, Robert Bradshaw via dev wrote:


On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev
 wrote:

I've actually wondered about this specifically for
streaming... if you're writing a pipeline there it
seems like you're often going to want to put high
fixed cos

Re: Runner Bundling Strategies

2023-09-25 Thread Jan Lukavský


On 9/23/23 18:16, Reuven Lax via dev wrote:

Two separate things here:

1. Yes, a watermark can update in the middle of a bundle.
2. The records in the bundle themselves will prevent the watermark 
from updating as they are still in flight until after finish bundle. 
Therefore simply caching the records should always be watermark safe, 
regardless of the runner. You will only run into problems if you try 
and move timestamps "backwards" - which is why Beam strongly 
discourages this.
This is not aligned with  FlinkRunner's implementation. And I actually 
think it is not aligned conceptually.  As mentioned, Flink does not have 
the concept of bundles at all. It achieves fault tolerance via 
checkpointing, essentially checkpoint barrier flowing from sources to 
sinks, safely snapshotting state of each operator on the way. Bundles 
are implemented as a somewhat arbitrary set of elements between two 
consecutive checkpoints (there can be multiple bundles between 
checkpoints). A bundle is 'committed' (i.e. persistently stored and 
guaranteed not to retry) only after the checkpoint barrier passes over 
the elements in the bundle (every bundle is finished at the very latest 
exactly before a checkpoint). But watermark propagation and bundle 
finalization is completely unrelated. This might be a bug in the runner, 
but requiring checkpoint for watermark propagation will introduce insane 
delays between processing time and watermarks, every executable stage 
will delay watermark propagation until a checkpoint (which is typically 
the order of seconds). This delay would add up after each stage.


Reuven

On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský  wrote:

> Watermarks shouldn't be (visibly) advanced until @FinishBundle
is committed, as there's no guarantee that this work won't be
discarded.

There was a thread [1], where the conclusion seemed to be that
updating watermark is possible even in the middle of a bundle.
Actually, handling watermarks is runner-dependent (e.g. Flink does
not store watermarks in checkpoints, they are always recomputed
from scratch on restore).

[1] https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv

On 9/22/23 21:47, Robert Bradshaw via dev wrote:

On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský 
wrote:

On 9/22/23 18:07, Robert Bradshaw via dev wrote:


On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev
 wrote:

I've actually wondered about this specifically for
streaming... if you're writing a pipeline there it seems
like you're often going to want to put high fixed cost
things like database connections even outside of the
bundle setup. You really only want to do that once in
the lifetime of the worker itself, not the bundle. Seems
like having that boundary be somewhere other than an
arbitrarily (and probably small in streaming to avoid
latency) group of elements might be more useful? I
suppose this depends heavily on the object lifecycle in
the sdk worker though.


+1. This is the difference between @Setup and @StartBundle.
The start/finish bundle operations should be used for
bracketing element processing that must be committed as a
unit for correct failure recovery (e.g. if elements are
cached in ProcessElement, they should all be emitted in
FinishBundle). On the other hand, things like open database
connections can and likely should be shared across bundles.

This is correct, but the caching between @StartBundle and
@FinishBundle has some problems. First, users need to
manually set watermark hold for min(timestamp in bundle),
otherwise watermark might overtake the buffered elements.


Watermarks shouldn't be (visibly) advanced until @FinishBundle is
committed, as there's no guarantee that this work won't be
discarded.

Users don't have other option than using
timer.withOutputTimestamp for that, as we don't have a
user-facing API to set watermark hold otherwise, thus the
in-bundle caching implies stateful DoFn. The question might
then by, why not use "classical" stateful caching involving
state, as there is full control over the caching in user
code. This triggered me an idea if it would be useful to add
the information about caching to the API (e.g. in Java
@StartBundle(caching=true)), which could solve the above
issues maybe (runner would know to set the hold, it could
work with "stateless" DoFns)?


Really, this is one of the areas that the streaming/batch
abstraction leaks. In batch it was a common pattern to have local
DoFn instance state that persisted from start to finish bundle,
and these were also used as convenient entr

Re: Runner Bundling Strategies

2023-09-23 Thread Jan Lukavský
> Watermarks shouldn't be (visibly) advanced until @FinishBundle is 
committed, as there's no guarantee that this work won't be discarded.


There was a thread [1], where the conclusion seemed to be that updating 
watermark is possible even in the middle of a bundle. Actually, handling 
watermarks is runner-dependent (e.g. Flink does not store watermarks in 
checkpoints, they are always recomputed from scratch on restore).


[1] https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv

On 9/22/23 21:47, Robert Bradshaw via dev wrote:

On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský  wrote:

On 9/22/23 18:07, Robert Bradshaw via dev wrote:


On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev
 wrote:

I've actually wondered about this specifically for
streaming... if you're writing a pipeline there it seems like
you're often going to want to put high fixed cost things like
database connections even outside of the bundle setup. You
really only want to do that once in the lifetime of the
worker itself, not the bundle. Seems like having that
boundary be somewhere other than an arbitrarily (and probably
small in streaming to avoid latency) group of elements might
be more useful? I suppose this depends heavily on the object
lifecycle in the sdk worker though.


+1. This is the difference between @Setup and @StartBundle. The
start/finish bundle operations should be used for bracketing
element processing that must be committed as a unit for
correct failure recovery (e.g. if elements are cached in
ProcessElement, they should all be emitted in FinishBundle). On
the other hand, things like open database connections can and
likely should be shared across bundles.

This is correct, but the caching between @StartBundle and
@FinishBundle has some problems. First, users need to manually set
watermark hold for min(timestamp in bundle), otherwise watermark
might overtake the buffered elements.


Watermarks shouldn't be (visibly) advanced until @FinishBundle is 
committed, as there's no guarantee that this work won't be discarded.


Users don't have other option than using timer.withOutputTimestamp
for that, as we don't have a user-facing API to set watermark hold
otherwise, thus the in-bundle caching implies stateful DoFn. The
question might then by, why not use "classical" stateful caching
involving state, as there is full control over the caching in user
code. This triggered me an idea if it would be useful to add the
information about caching to the API (e.g. in Java
@StartBundle(caching=true)), which could solve the above issues
maybe (runner would know to set the hold, it could work with
"stateless" DoFns)?


Really, this is one of the areas that the streaming/batch abstraction 
leaks. In batch it was a common pattern to have local DoFn instance 
state that persisted from start to finish bundle, and these were also 
used as convenient entry points for other operations (like opening 
database connections) 'cause bundles were often "as large as 
possible." WIth the advent of n streaming it makes sense to put this 
in explicitly managed runner state to allow for cross-bundle 
amortization and there's more value in distinguishing between @Setup 
and @StartBundle.


(Were I do to things over I'd probably encourage an API that 
discouraged non-configuration instance state on DoFns altogether, e.g. 
in the notion of Python context managers (and an equivalent API could 
probably be put together with AutoClosables in Java) one would have 
something like


ParDo(X)

which would logically (though not necessarily physically) lead to an 
execution like


with X.bundle_processor() as bundle_processor:
  for bundle in bundles:
    with bundle_processor.element_processor() as process:
      for element in bundle:
        process(element)

where the traditional setup/start_bundle/finish_bundle/teardown logic 
would live in the __enter__ and __exit__ methods (made even easier 
with coroutines.) For convenience one could of course provide a raw 
bundle processor or element processor to ParDo if the enter/exit 
contexts are trivial. But this is getting somewhat off-topic...




Best,
B

On Fri, Sep 22, 2023 at 7:03 AM Kenneth Knowles
 wrote:

(I notice that you replied only to yourself, but there
has been a whole thread of discussion on this - are you
subscribed to dev@beam?
https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd)

It sounds like you want what everyone wants: to have the
biggest bundles possible.

So for bounded data, basically you make even splits of
the data and each split is one bundle. And then dynamic
splitting to redistribute work to eliminate straggler

Re: Runner Bundling Strategies

2023-09-22 Thread Jan Lukavský


On 9/22/23 18:07, Robert Bradshaw via dev wrote:
On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev 
 wrote:


I've actually wondered about this specifically for streaming... if
you're writing a pipeline there it seems like you're often going
to want to put high fixed cost things like database connections
even outside of the bundle setup. You really only want to do that
once in the lifetime of the worker itself, not the bundle. Seems
like having that boundary be somewhere other than an arbitrarily
(and probably small in streaming to avoid latency) group of
elements might be more useful? I suppose this depends heavily on
the object lifecycle in the sdk worker though.


+1. This is the difference between @Setup and @StartBundle. The 
start/finish bundle operations should be used for bracketing element 
processing that must be committed as a unit for 
correct failure recovery (e.g. if elements are cached in 
ProcessElement, they should all be emitted in FinishBundle). On the 
other hand, things like open database connections can and likely 
should be shared across bundles.
This is correct, but the caching between @StartBundle and @FinishBundle 
has some problems. First, users need to manually set watermark hold for 
min(timestamp in bundle), otherwise watermark might overtake the 
buffered elements. Users don't have other option than using 
timer.withOutputTimestamp for that, as we don't have a user-facing API 
to set watermark hold otherwise, thus the in-bundle caching implies 
stateful DoFn. The question might then by, why not use "classical" 
stateful caching involving state, as there is full control over the 
caching in user code. This triggered me an idea if it would be useful to 
add the information about caching to the API (e.g. in Java 
@StartBundle(caching=true)), which could solve the above issues maybe 
(runner would know to set the hold, it could work with "stateless" DoFns)?



Best,
B

On Fri, Sep 22, 2023 at 7:03 AM Kenneth Knowles 
wrote:

(I notice that you replied only to yourself, but there has
been a whole thread of discussion on this - are you subscribed
to dev@beam?
https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd)

It sounds like you want what everyone wants: to have the
biggest bundles possible.

So for bounded data, basically you make even splits of the
data and each split is one bundle. And then dynamic splitting
to redistribute work to eliminate stragglers, if your engine
has that capability.

For unbounded data, you more-or-less bundle as much as you can
without waiting too long, like Jan described.

Users know to put their high fixed costs in @StartBundle and
then it is the runner's job to put as many calls
to @ProcessElement as possible to amortize.

Kenn

On Fri, Sep 22, 2023 at 9:39 AM Joey Tran
 wrote:

Whoops, I typoed my last email. I meant to write "this
isn't the greatest strategy for high *fixed* cost
transforms", e.g. a transform that takes 5 minutes to get
set up and then maybe a microsecond per input

I suppose one solution is to move the responsibility for
handling this kind of situation to the user and expect
users to use a bundling transform (e.g. BatchElements [1])
followed by a Reshuffle+FlatMap. Is this what other
runners expect? Just want to make sure I'm not missing
some smart generic bundling strategy that might handle
this for users.

[1]

https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements



On Thu, Sep 21, 2023 at 7:23 PM Joey Tran
 wrote:

Writing a runner and the first strategy for
determining bundling size was to just start with a
bundle size of one and double it until we reach a size
that we expect to take some targets per-bundle runtime
(e.g. maybe 10 minutes). I realize that this isn't the
greatest strategy for high sized cost transforms. I'm
curious what kind of strategies other runners take?


Re: Runner Bundling Strategies

2023-09-22 Thread Jan Lukavský
Flink operators are long-running classes with life-cycle of open() and 
close(), so any amortization can be done between those methods, see [1]. 
Essentially, it could be viewed that in vanilla Flink the complete 
(unbounded) input is single "bundle". The crucial point is that state is 
checkpointed while this "bundle" is open.


 Jan

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/internals/task_lifecycle/


On 9/22/23 15:21, Kenneth Knowles wrote:
What is the best way to amortize heavy operations across elements in 
Flink? (that is what bundles are for, basically)


On Fri, Sep 22, 2023 at 5:09 AM Jan Lukavský  wrote:

Flink defines bundles in terms of number of elements and
processing time, by default 1000 elements or 1000 milliseconds,
whatever happens first. But bundles are not a "natural" concept in
Flink, it uses them merely to comply with the Beam model. By
default, checkpoints are unaligned with bundles.

 Jan

On 9/22/23 01:58, Robert Bradshaw via dev wrote:

Dataflow uses a work-stealing protocol. The FnAPI has a protocol
to ask the worker to stop at a certain element that has already
been sent.

On Thu, Sep 21, 2023 at 4:24 PM Joey Tran
 wrote:

Writing a runner and the first strategy for determining
bundling size was to just start with a bundle size of one and
double it until we reach a size that we expect to take some
targets per-bundle runtime (e.g. maybe 10 minutes). I realize
that this isn't the greatest strategy for high sized cost
transforms. I'm curious what kind of strategies other runners
take?


Re: Runner Bundling Strategies

2023-09-22 Thread Jan Lukavský
Flink defines bundles in terms of number of elements and processing 
time, by default 1000 elements or 1000 milliseconds, whatever happens 
first. But bundles are not a "natural" concept in Flink, it uses them 
merely to comply with the Beam model. By default, checkpoints are 
unaligned with bundles.


 Jan

On 9/22/23 01:58, Robert Bradshaw via dev wrote:
Dataflow uses a work-stealing protocol. The FnAPI has a protocol to 
ask the worker to stop at a certain element that has already been sent.


On Thu, Sep 21, 2023 at 4:24 PM Joey Tran  
wrote:


Writing a runner and the first strategy for determining bundling
size was to just start with a bundle size of one and double it
until we reach a size that we expect to take some targets
per-bundle runtime (e.g. maybe 10 minutes). I realize that this
isn't the greatest strategy for high sized cost transforms. I'm
curious what kind of strategies other runners take?


Re: Stateful Beam Job with Flink Runner - Checkpoint Size Increasing Over Time

2023-09-19 Thread Jan Lukavský

Hi,

Hemant, can you please share the code of the Pipeline? Do you use side 
inputs? Besides what Kenn already described:


> 2.  When is the state information cleared on the WindowDoFn (TUMBLE 
windows)  on window closure ? When will global states and timers get 
cleared?


The state and timers for windows is cleared using cleaner created in 
createWrappingDoFnRunner method [1]. The only exception is global 
window, where the state and timers are cleared only on final watermark 
using [2]. The reason is that otherwise Flink accumulates window GC 
timers per all keys ever seen in global window.


> 3.  Is timer and keystate information clearance by the following 
enough to not have ever increasing memory or checkpoints?
AFAIK, state and timers are correctly cleared on window GC time, so - 
because you see state increasing over time in code path that corresponds 
to side-inputs - I would suppose that your side-input is what grows over 
time. Can you verify that? In case it is the problem, you can try 
switching state backend so that you don't have to keep it all in memory 
(RocksDB), or consider using different technique for joining (merging) 
two input streams (flatten and apply joining logic yourself including 
buffering).


Best,

 Jan

[1] 
https://github.com/apache/beam/blob/79f46e00184fc5fcea7c9c4a85e2ed8467ef1a71/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L351
[2] 
https://github.com/apache/beam/blob/34024902746af90e7bf41e28729ec031dbab58d2/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L212


On 9/19/23 15:56, Kenneth Knowles wrote:
Caveat: it has been a long time and I don't really know the details of 
the FlinkRunner. But I can answer a couple questions.


On Fri, Sep 15, 2023 at 7:07 PM Hemant Kumar via dev 
 wrote:


Hi Team,

I am facing an issue of running a beam stateful job on flink,

*Problem Statement:*
    Stateful beam application with TUMBLE window running on Flink
Runner which has consistent checkpoint size increasing over time.

*Observation:*
   The memory usage keeps increasing over time and getting
OOM kill (code 137) on kubernetes pods.

*Version:*
    Beam version 2.32, Flink version 1.13.6, State backend -
EmbeddedRocksDB (com.ververica -  frocksdbjni - 6.20.3-ververica-2.0)

*Assumption:*
   State is never cleared on statebackend even when the window is
closed.

*Questions:
*
  1. What is the significance of currentSideInputWatermark in
/org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator/
and how does it affect application without side input?

https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L767


If you have a main input and a side input, each main input window is 
buffered until the side input is "ready" for that window to be 
processed. That particularly line is about flushing all the rest of 
the data when the side input is fully ready and you are guaranteed to 
never see more data on the side input.
The rest of the questions I don't know when the under-the-hood stuff 
is cleared out.


Kenn

    On removing the check /if (currentSideInputWatermark >=
BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) and calling
/emitAllPushedBackData(); for every processwatermark reduces the
checkpoint size, which otherwise keeps increasing

  2.  When is the state information cleared on the WindowDoFn
(TUMBLE windows)  on window closure ? When will global states and
timers get cleared?

  3.  Is timer and keystate information clearance by the following
enough to not have ever increasing memory or checkpoints?

*Flush on watermark:*


pushedBackElementsHandler.clear();


*Timer removal:*



keyedStateInternals.removeWatermarkHoldUsage(timer.getOutputTimestamp());


*Global removal:*


keyedStateInternals.clearGlobalState();


/

/
  -Hemant





Re: [ANNOUNCE] New committer: Ahmed Abualsaud

2023-08-25 Thread Jan Lukavský

Congrats Ahmed!

On 8/25/23 07:56, Anand Inguva via dev wrote:

Congratulations Ahmed :)

On Fri, Aug 25, 2023 at 1:17 AM Damon Douglas 
 wrote:


Well deserved! Congratulations, Ahmed! I'm so happy for you.

On Thu, Aug 24, 2023, 5:46 PM Byron Ellis via dev
 wrote:

Congratulations!

On Thu, Aug 24, 2023 at 5:34 PM Robert Burke
 wrote:

Congratulations Ahmed!!

On Thu, Aug 24, 2023, 4:08 PM Chamikara Jayalath via dev
 wrote:

Congrats Ahmed!!

On Thu, Aug 24, 2023 at 4:06 PM Bruno Volpato via dev
 wrote:

Congratulations, Ahmed!

Very well deserved!


On Thu, Aug 24, 2023 at 6:09 PM XQ Hu via dev
 wrote:

Congratulations, Ahmed!

On Thu, Aug 24, 2023, 5:49 PM Ahmet Altay via
dev  wrote:

Hi all,

Please join me and the rest of the Beam
PMC in welcoming a new committer: Ahmed
Abualsaud (ahmedabuals...@apache.org).

Ahmed has been part of the Beam community
since January 2022, working mostly on IO
connectors, made a large amount of
contributions to make Beam IOs more
usable, performant, and reliable. And at
the same time Ahmed was active in the user
list and at the Beam summit helping users
by sharing his knowledge.

Considering their contributions to the
project over this timeframe, the Beam PMC
trusts Ahmed with the responsibilities of
a Beam committer. [1]

Thank you Ahmed! And we are looking to see
more of your contributions!

Ahmet, on behalf of the Apache Beam PMC

[1]

https://beam.apache.org/contribute/become-a-committer/#an-apache-beam-committer



Re: [VOTE] Release 2.49.0, release candidate #2

2023-07-13 Thread Jan Lukavský

+1 (binding)

Tested Java SDK with FlinkRunner.

 Jan

On 7/13/23 02:30, Bruno Volpato via dev wrote:

+1 (non-binding).

Tested with https://github.com/GoogleCloudPlatform/DataflowTemplates 
(Java SDK 11, Dataflow runner).


Thanks Yi!

On Tue, Jul 11, 2023 at 4:23 PM Yi Hu via dev  wrote:

Hi everyone,
Please review and vote on the release candidate #2 for the version
2.49.0, as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)


Reviewers are encouraged to test their own use cases with the
release candidate, and vote +1 if
no issues are found. Only PMC member votes will count towards the
final vote, but votes from all
community members is encouraged and helpful for finding
regressions; you can either test your own
use cases or use cases from the validation sheet [10].

The complete staging area is available for your review, which
includes:
* GitHub Release notes [1],
* the official Apache source release to be deployed to
dist.apache.org  [2], which is signed with
the key with
fingerprint either CB6974C8170405CB (y...@apache.org) or
D20316F712213422 (GitHub Action automated) [3],
* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag "v2.49.0-RC2" [5],
* website pull request listing the release [6], the blog post [6],
and publishing the API reference manual [7].
* Java artifacts were built with Gradle GRADLE_VERSION and
OpenJDK/Oracle JDK JDK_VERSION.
* Python artifacts are deployed along with the source release to
the dist.apache.org  [2] and PyPI [8].
* Go artifacts and documentation are available at pkg.go.dev
 [9]
* Validation sheet with a tab for 2.49.0 release to help with
validation [10].
* Docker images published to Docker Hub [11].
* PR to run tests against release branch [12].

The vote will be open for at least 72 hours. It is adopted by
majority approval, with at least 3 PMC affirmative votes.

For guidelines on how to try the release in your projects, check
out our blog post at /blog/validate-beam-release/.

Thanks,
Release Manager

[1] https://github.com/apache/beam/milestone/13
[2] https://dist.apache.org/repos/dist/dev/beam/2.49.0/
[3] https://dist.apache.org/repos/dist/release/beam/KEYS
[4]
https://repository.apache.org/content/repositories/orgapachebeam-1349/
[5] https://github.com/apache/beam/tree/v2.49.0-RC2
[6] https://github.com/apache/beam/pull/27374 (unchanged since RC1)
[7] https://github.com/apache/beam-site/pull/646  (unchanged since
RC1)
[8] https://pypi.org/project/apache-beam/2.49.0rc2/
[9]
https://pkg.go.dev/github.com/apache/beam/sdks/v2@v2.49.0-RC2/go/pkg/beam
[10]

https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=934901728
[11] https://hub.docker.com/search?q=apache%2Fbeam=image

[12] https://github.com/apache/beam/pull/27307

-- 


Yi Hu, (he/him/his)

Software Engineer



Re: [DISCUSS] Enable Github Discussions?

2023-07-04 Thread Jan Lukavský

-1

Totally agree with Byron and Alexey.

 Jan

On 7/3/23 21:18, Byron Ellis via dev wrote:
-1. This just leads to needless fragmentation not to mention being at 
the mercy of a specific technology provider.


On Mon, Jul 3, 2023 at 11:39 AM XQ Hu via dev  wrote:

+1 with GH discussion.
If Airflow can do this
https://github.com/apache/airflow/discussions, I think we can do
this as well.

On Mon, Jul 3, 2023 at 9:51 AM Alexey Romanenko
 wrote:

-1
I understand that for some people, who maybe are not very
familiar with ASF and its “Apache Way” [1], it may sound a bit
obsolete but mailing lists are one of the key things of every
ASF project which Apache Beam is. Having user@, dev@
and commits@ lists are required for ASF project to maintain
the open discussions that are publicly accessible and archived
in the same way for all ASF projects.

I just wanted to remind a key motto at Apache Software
Foundation is:
/“If it didn't happen on the mailing list, it didn't happen.”/
/
/
—
Alexey

[1] https://apache.org/theapacheway/index.html


On 1 Jul 2023, at 19:54, Anand Inguva via dev
 wrote:

+1 for GitHub discussions as well. But I am also little
concerned about multiple places for discussions. As Danny
said, if we have a good plan on how to move forward on
how/when to archive the current mailing list, that would be
great.

Thanks,
Anand

On Sat, Jul 1, 2023, 3:21 AM Damon Douglas
 wrote:

I'm very strong +1 for replacing the use of Email with
GitHub Discussions. Thank you for bringing this up.

On Fri, Jun 30, 2023 at 7:38 AM Danny McCormick via dev
 wrote:

Thanks for starting this discussion!

I'm a weak -1 for this proposal. While I think that
GH Discussions can be a good forum, I think most of
the things that Discussions do are covered by some
combination of the dev/user lists and GitHub issues,
and the net outcome of this will be creating one more
forum to pay attention to. I know in the past we've
had a hard time keeping up with Stack overflow
questions for a similar reason. With that said, I'm
not opposed to trying it out and experimenting as
long as we have (a) clear criteria for understanding
if the change is effective or not (can be
subjective), (b) a clear idea of when we'd revisit
the discussion, and (c) a clear path to rollback the
decision without it being /too /much work (this might
mean something like disabling future discussions and
keeping the history or somehow moving the history to
the dev or user list). If we do this, I also think we
should update
https://beam.apache.org/community/contact-us/ with a
clear taxonomy of what goes where (this is what I'm
unsure of today).

FWIW, if we were proposing cutting either the user
list or both the user and dev list in favor of
discussions, I would be +1. I do think the advantages
of discussions over email are real (threaded, easy to
convert to/from issues, markdown, one place for all
things Beam).

Thanks,
Danny

On Fri, Jun 30, 2023 at 10:23 AM Svetak Sundhar via
dev  wrote:

Hi all,

I wanted to start a discussion to gauge interest
on enabling Github Discussions
in
Apache Beam.

Pros:
+ GH Discussions allows for folks to get
unblocked on small/medium implementation blocker
(Google employees can often get this help by
scheduling a call with teammates whereas there is
a larger barrier for non-Google employees to get
this help).
+ On the above point, more visibility into the
development blockers that others have previously
faced.
+ GH Discussions is more discoverable and
approachable for new users and contributors.
+ A centralized place to have discussions. Long
term, it makes sense to eventually fully migrate
to GH Discussions.

Cons:
- For a period of time when we use both the dev
  

Re: Is Flink >1.14 really supported by the runners?

2023-06-13 Thread Jan Lukavský

Probably better for dev@ <mailto:dev@beam.apache.org> (added).

 Jan

On 6/13/23 12:43, Edgar H wrote:

Got you, thanks!

Are there any plans on supporting 1.17 anytime soon too?

El mar, 13 jun 2023, 12:27, Jan Lukavský  escribió:

Hi Edgar,

the website seems to be mistakenly not updated when the support for
Flink versions was added. This should be fixed [1], the runner is
stable
on versions up to 1.16.

Regarding Flink Operator, I'm not 100% familiar with it, but given
that
Beam Pipeline is translated into standard Flink application, Beam
should
be able to run on the operator just like any other application.

Best,

  Jan

[1] https://github.com/apache/beam/issues/27115

On 6/13/23 12:10, Edgar H wrote:
> Hi all,
>
> I've been running Beam with Flink in 1.14 for quite some time
now but
> just seen in Maven that runners supporting 1.15 and 1.16 are
available
> to use, however no mention to them within the compatibility matrix.
>
> Are they stable enough to be used?
>
> And also, https://issues.apache.org/jira/browse/BEAM-14538
seeing this
> issue and wondering, does Beam really need to support the operator
> usage or is it something not related to?
>
> Thanks!


Re: Watermark Alignment on Flink Runner's UnboundedSourceWrapper

2023-05-23 Thread Jan Lukavský
Yes, FlinkRunner supports Beam's event-time semantics without any 
additional configuration options.


 Jan

On 5/23/23 09:52, Talat Uyarer via dev wrote:

Hi Jan,

Yes My plan is implementing this feature on FlinkRunner. I have one 
more question. Does Flink Runner support EventTime or 
Beam  Custom Watermark ? Do I need to set AutoWatermarkInterval for 
stateful Beam Flink Jobs. Or Beam timers can handle it without setting 
that param ?


Thanks

On Tue, May 23, 2023 at 12:03 AM Jan Lukavský  wrote:

Hi Talat,

your analysis is correct, aligning watermarks for jobs with high
watermark skew in input partitions really results in faster
checkpoints and reduces the size of state. There are generally two
places you can implement this - in user code (the source) or
inside runner. The user code can use some external synchronization
(e.g. ZooKeeper) to keep track of progress of all individual
sources. Another option is to read the watermark from Flink's Rest
API (some inspiration here [1]).

Another option would be to make use of [2] and implement this
directly in FlinkRunner. I'm not familiar with any possible
limitations of this, this was added to Flink quite recently (we
would have to support this only when running on Flink 1.15+).

If you would like to go for the second approach, I'd be happy to
help with some guidance.

Best,

 Jan

[1]

https://github.com/O2-Czech-Republic/proxima-platform/blob/master/flink/utils/src/main/java/cz/o2/proxima/flink/utils/FlinkGlobalWatermarkTracker.java

<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_O2-2DCzech-2DRepublic_proxima-2Dplatform_blob_master_flink_utils_src_main_java_cz_o2_proxima_flink_utils_FlinkGlobalWatermarkTracker.java=DwMDaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=aecDGh6mOmdF13fznxqZ2eCSP--lTT02C2dNHJ4zckuqPG2gcb8ZMfwPqL8hjYrT=9YSIXGwhsRQ08Q4jSLt6pJtZ17cvw5mL-MEt-oCZcP8=>
[2]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources

<https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_FLINK_FLIP-2D182-253A-2BSupport-2Bwatermark-2Balignment-2Bof-2BFLIP-2D27-2BSources=DwMDaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=aecDGh6mOmdF13fznxqZ2eCSP--lTT02C2dNHJ4zckuqPG2gcb8ZMfwPqL8hjYrT=xL-Z7KyqXzMfcalEPIc9nMzaorgJ7s3cHH444pReL1c=>

On 5/23/23 01:05, Talat Uyarer via dev wrote:

Maybe the User list does not have knowledge about this. That's
why I also resend on the Dev list. Sorry for cross posting


Hi All,

I have a stream aggregation job which reads from Kafka and writes
some Sinks.

When I submit my job Flink checkpoint size keeps increasing if I
use unaligned checkpoint settings and it does not emit any window
results.
If I use an aligned checkpoint, size is somewhat under
control(still big) but Checkpoint alignment takes a long time.

I would like to implement something similar [1]. I believe
if UnboundedSourceWrapper pause reading future watermark
partitions it will reduce the size of the checkpoint and I can
use unaligned checkpointing. What do you think about this
approach ? Do you have another solution ?

One more question: I was reading code to implement the above
idea. I saw this code [2] Does Flink Runner have a similar
implementation?

Thanks

[1] https://github.com/apache/flink/pull/11968

<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_pull_11968=DwMDaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=aecDGh6mOmdF13fznxqZ2eCSP--lTT02C2dNHJ4zckuqPG2gcb8ZMfwPqL8hjYrT=Rb3yOAuXoya8Yo5IMdRYyxBpvWzJ3UmqhPUgc1WJdNs=>
[2]

https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L207

<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_beam_blob_master_runners_flink_src_main_java_org_apache_beam_runners_flink_translation_wrappers_streaming_state_FlinkStateInternals.java-23L207=DwMDaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=aecDGh6mOmdF13fznxqZ2eCSP--lTT02C2dNHJ4zckuqPG2gcb8ZMfwPqL8hjYrT=iyl4EcoO9Vtd-X9IxkJHUtgFaHEmUTyM__0qmkCIeQ4=>


Re: Watermark Alignment on Flink Runner's UnboundedSourceWrapper

2023-05-23 Thread Jan Lukavský

Hi Talat,

your analysis is correct, aligning watermarks for jobs with high 
watermark skew in input partitions really results in faster checkpoints 
and reduces the size of state. There are generally two places you can 
implement this - in user code (the source) or inside runner. The user 
code can use some external synchronization (e.g. ZooKeeper) to keep 
track of progress of all individual sources. Another option is to read 
the watermark from Flink's Rest API (some inspiration here [1]).


Another option would be to make use of [2] and implement this directly 
in FlinkRunner. I'm not familiar with any possible limitations of this, 
this was added to Flink quite recently (we would have to support this 
only when running on Flink 1.15+).


If you would like to go for the second approach, I'd be happy to help 
with some guidance.


Best,

 Jan

[1] 
https://github.com/O2-Czech-Republic/proxima-platform/blob/master/flink/utils/src/main/java/cz/o2/proxima/flink/utils/FlinkGlobalWatermarkTracker.java
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources


On 5/23/23 01:05, Talat Uyarer via dev wrote:
Maybe the User list does not have knowledge about this. That's why I 
also resend on the Dev list. Sorry for cross posting



Hi All,

I have a stream aggregation job which reads from Kafka and writes some 
Sinks.


When I submit my job Flink checkpoint size keeps increasing if I use 
unaligned checkpoint settings and it does not emit any window results.
If I use an aligned checkpoint, size is somewhat under control(still 
big) but Checkpoint alignment takes a long time.


I would like to implement something similar [1]. I believe 
if UnboundedSourceWrapper pause reading future watermark partitions it 
will reduce the size of the checkpoint and I can use unaligned 
checkpointing. What do you think about this approach ? Do you have 
another solution ?


One more question: I was reading code to implement the above idea. I 
saw this code [2] Does Flink Runner have a similar implementation?


Thanks

[1] https://github.com/apache/flink/pull/11968
[2] 
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L207

Re: [VOTE] Release 2.47.0, release candidate #3

2023-05-10 Thread Jan Lukavský

+1 (binding)

Tested with Java SDK and FlinkRunner.

 Jan

On 5/9/23 08:44, Chamikara Jayalath via dev wrote:

Verified that new containers are valid. Changing my vote to +1

Thanks for fixing this Jack.

- Cham

On Mon, May 8, 2023 at 2:05 PM Jack McCluskey  
wrote:


I've spent the day putting together an environment on a debian
bullseye container to re-build containers with a matching Glibc
version. The Java, Go, Python, and Typescript containers have all
been re-built and pushed to Docker Hub. The underlying code did
not change, which fortunately means we can dodge having to build
an RC4 to fix this issue.

The GCR copy of the Go container has already been updated, while
the Java and Python containers are currently being copied over.

On Mon, May 8, 2023 at 11:16 AM Robert Bradshaw
 wrote:

Thanks for catching this. This does seem severe enough that we
need to fix it before the release.

On Sat, May 6, 2023 at 10:15 PM Chamikara Jayalath via dev
 wrote:

Seems like Python SDK harness containers built for the
current RC are broken. Please see
https://github.com/apache/beam/issues/26576 for updates.

-1 for the current vote due to this.

Seems like this can be addressed by reverting
https://github.com/apache/beam/pull/26054 and re-building
the containers.

Thanks,
Cham

On Sat, May 6, 2023 at 8:00 AM Svetak Sundhar
 wrote:

+1 (Non-Binding)

I tested Python Quick Start on Dataflow Runner as well





Svetak Sundhar

  Technical Solutions Engineer, Data

s vetaksund...@google.com



On Sat, May 6, 2023 at 4:44 AM Chamikara Jayalath via
dev  wrote:

I'm seeing a regression when running Java x-lang
jobs using the RC. Created
https://github.com/apache/beam/issues/26576.

Thanks,
Cham

On Fri, May 5, 2023 at 11:11 PM Austin Bennett
 wrote:

+1 ( non-binding )

On Fri, May 5, 2023 at 10:49 PM Jean-Baptiste
Onofré  wrote:

+1 (binding)

Regards
JB

On Fri, May 5, 2023 at 4:52 AM Jack
McCluskey via dev  wrote:

Hi everyone,

Please review and vote on the release
candidate #3 for the version 2.47.0,
as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release
(please provide specific comments)

Reviewers are encouraged to test their
own use cases with the release
candidate, and vote +1 if no issues
are found. *Non-PMC members are
allowed and encouraged to vote. Please
help validate the release for your use
case!*

The complete staging area is available
for your review, which includes:
* GitHub Release notes [1],
* the official Apache source release
to be deployed to dist.apache.org
 [2], which
is signed with the key with
fingerprint DF3CBA4F3F4199F4 [3],
* all artifacts to be deployed to the
Maven Central Repository [4],
* source code tag "v2.47.0-RC3" [5],
* website pull request listing the
release [6], the blog post [6], and
publishing the API reference manual [7].
* Java artifacts were built with
Gradle 7.5.1 and OpenJDK/Oracle JDK
8.0.322.
* Python artifacts are deployed along
with the source release to the
dist.apache.org
 [2] and PyPI[8].
* Go artifacts and documentation are
  

Re: Thoughts on coder evolution

2023-05-04 Thread Jan Lukavský

On 5/3/23 19:57, Kenneth Knowles wrote:
My big picture hot take: this is useful, but the problem we really 
need to solve is topology change, which will obsolete coder evolution.


I think Beam model has a role in this. It isn't just a runner-specific 
thing. We need to ensure the model makes it possible/easy to replace 
one pipeline with another, and to define how that should look. For 
example, a composite PTransform with the same input/output types 
should have its internals replaced in some blue/green way in some 
cases. This would of course includes lots of coder changes without any 
notion of compatibility possible. And for a pipeline that looks pretty 
much the same except for some encoding change, we should definitely 
see if we can define a more localized migration process.
Yes, change of coder can be viewed as a simplistic change in topology 
(if we assume that coder is a property of edge connecting two nodes). In 
this sense, if we were to provide way for topology upgrade, then this 
would be solved as well. On the other hand I'm a little afraid that 
fully generic change in topology is unachievable, because the state 
stored in the previous Pipeline might not have any relation with what 
should be stored in the upgraded Pipeline. Yes, there can be (probably 
many) cases when this could be possible.


I haven't thought about this in a while so I don't have technical 
proposals for achieving it yet. Ultimately as a user of Beam and/or 
any runner I would consider being able to run a brand new unrelated 
pipeline and hot swap it for the live one to be a prerequisite to 
production-readiness, but it has been a while since this was what I 
did day-to-day.
Agree, the ability to do this is indeed necessary. The argument is that 
this might be costly. It would help if we provided a way to bootstrap 
state for streaming Pipeline via batch Pipeline (and even better, in 
runner-agnostic way, so that users could use different runners for 
both). But this is a completely different topic. :-)


Returning back to my original motivation - all this would be a lot of 
work, so I think it is reasonable to propose a short cut - deprecate 
KryoCoder and introduce Kryo5Coder (or wait for Kryo 6) as an 
alternative and let users handle the transition themselves.


 Jan



Kenn

On Wed, May 3, 2023 at 8:07 AM Byron Ellis via dev 
 wrote:


I think I'm not understanding the use case here? Are we talking
about encoding of data in motion (e.g. between stages of a
streaming pipeline) or data at rest? (e.g. input formats and
output formats) Or maybe something else?

This is both data in motion (between stages of Pipeline) and at rest, 
but inside the Pipeline (state). Not input/output formats.



On Wed, May 3, 2023 at 6:58 AM Jan Lukavský  wrote:

Hi,

I'd like to discuss a topic, that from time to time appears in
different
contexts (e.g. [1]). I'd like restate the problem in a
slightly more
generic way as: "Should we have a way to completely exchange
coder of a
PCollection/state of a _running_ Pipeline?". First my
motivation for
this question - Beam has an extension called
beam-sdks-java-extensions-kryo, which contains a KryoCoder.
This coder
uses Kryo [2] to serialize virtually any Java class into
binary format.
Unfortunately, this binary representation differs between Kryo
versions
and it does not contain any way to recognize which version of
Kryo was
used to serialize the data. Attempt to deserialize bytes
produced by
incompatible version of Kryo results in an exception. The current
version of Kryo that is used by the KryoCoder is already more
than 5
years old and upgrade to newer version is needed, because the
current
version does not work with JDK17+ [3]. Thus, the only option
seems to be
the creation of a different Coder (e.g. Kryo5Coder), but then
we need
the ability to transfer Pipelines using the old KryoCoder to
the newer
one. That is, we need to completely switch coder that encodes
PCollection and/or state.

We have therefore the following options:

  1) Simply ignore this and let users rerun the Pipeline from
scratch.
This is possible, essentially should be applicable, but if
anything
else, for some Pipelines it might be costly to reprocess all
historical
data.

  2) We can create the new Coder and let users use a
runner-specific way
to convert the Pipeline. E.g. in case of Flink, this could be
done by
converting savepoint into the new format. This requires
knowledge of how
Beam stores state (namespaces) and is kind of involved on the
user side.
We could probably provide runner-spec

Thoughts on coder evolution

2023-05-03 Thread Jan Lukavský

Hi,

I'd like to discuss a topic, that from time to time appears in different 
contexts (e.g. [1]). I'd like restate the problem in a slightly more 
generic way as: "Should we have a way to completely exchange coder of a 
PCollection/state of a _running_ Pipeline?". First my motivation for 
this question - Beam has an extension called 
beam-sdks-java-extensions-kryo, which contains a KryoCoder. This coder 
uses Kryo [2] to serialize virtually any Java class into binary format. 
Unfortunately, this binary representation differs between Kryo versions 
and it does not contain any way to recognize which version of Kryo was 
used to serialize the data. Attempt to deserialize bytes produced by 
incompatible version of Kryo results in an exception. The current 
version of Kryo that is used by the KryoCoder is already more than 5 
years old and upgrade to newer version is needed, because the current 
version does not work with JDK17+ [3]. Thus, the only option seems to be 
the creation of a different Coder (e.g. Kryo5Coder), but then we need 
the ability to transfer Pipelines using the old KryoCoder to the newer 
one. That is, we need to completely switch coder that encodes 
PCollection and/or state.


We have therefore the following options:

 1) Simply ignore this and let users rerun the Pipeline from scratch. 
This is possible, essentially should be applicable, but if anything 
else, for some Pipelines it might be costly to reprocess all historical 
data.


 2) We can create the new Coder and let users use a runner-specific way 
to convert the Pipeline. E.g. in case of Flink, this could be done by 
converting savepoint into the new format. This requires knowledge of how 
Beam stores state (namespaces) and is kind of involved on the user side. 
We could probably provide runner-specific tools for this, but some 
runners, in general, might not allow such state manipulation.


 3) We can include the information of a Coder update into the Pipeline 
and resubmit it to the runner and let the runner handle it. Upon 
Pipeline restart, a runner would have to convert all state and all 
inflight data from the old Coder to the new one, before resuming the 
Pipeline.


Option 3) seems like the most natural, but it requires support on the 
runner side.


I leave the details on how a runner would do this open, I'm currently 
interested in knowing what is the community's position on this.


 Jan

[1] https://lists.apache.org/thread/z2m1hg4l5k2kb7nhjkv2lnwf8g4t9wps

[2] https://github.com/EsotericSoftware/kryo

[3] https://github.com/EsotericSoftware/kryo/issues/885



Re: [ANNOUNCE] New committer: Damon Douglas

2023-04-25 Thread Jan Lukavský

Congrats Damon!

On 4/25/23 06:15, Alex Kosolapov wrote:


Congratulations, Damon!

*From: *Kenneth Knowles 
*Reply-To: *"dev@beam.apache.org" 
*Date: *Monday, April 24, 2023 at 12:52 PM
*To: *"dev@beam.apache.org" 
*Subject: *[EXTERNAL] [ANNOUNCE] New committer: Damon Douglas

Hi all,

Please join me and the rest of the Beam PMC in welcoming a new 
committer: Damon Douglas (damondoug...@apache.org)


Damon has contributed widely: Beam Katas, playground, infrastructure, 
and many IO connectors. Damon does lots of code review in addition to 
code. (yes, you can review code as a non-committer!)


Considering their contributions to the project over this timeframe, 
the Beam PMC trusts Damon with the responsibilities of a Beam 
committer. [1]


Thank you Damon! And we are looking to see more of your contributions!

Kenn, on behalf of the Apache Beam PMC

[1]
https://beam.apache.org/contribute/become-a-committer/#an-apache-beam-committer


Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-21 Thread Jan Lukavský
Absolutely agree this is not something that should be part of the model. 
The ResourceHints is good place, but given how Pipeline might get fused 
(and though this might be under the control of a runner, basically all 
runners use the same code, because there is currently no reason why this 
should be runner-specifiic), there is a problem with how to resolve 
conflicting settings. Also it is somewhat questionable if parallelism is 
a "resource". It feels more like a runtime property. I tend to think 
that FlinkPipelineOptions could be a good place for that, because this 
seems to apply (mostly) to Flink batch runner.


On 4/21/23 19:43, Robert Bradshaw via dev wrote:
+1 to not requiring details like this in the Beam model. There is, 
however, the question of how to pass such implementation-detail 
specific hints to a runner that requires them. Generally that's done 
via ResourceHints or annotations, and while the former seems a good 
fit it's primarily focused on setting up the right context for user 
code (which GBK is not).


A complete hack is to add an experiment like 
flink_parallelism_for_stage=STAGE_NAME:value. It'd be nice to do 
something cleaner.



On Fri, Apr 21, 2023 at 10:37 AM Ning Kang via user 
 wrote:


Hi Jan,

To generalize the per-stage parallelism configuration, we should
have a FR proposing the capability to explicitly set autoscaling
(in this case, fixed size per stage) policy in Beam pipelines.

Per-step or per-stage parallelism, or fusion/optimization is not
part of the Beam model. They are [Flink] runner implementation
details and should be configured for each runner.

Also, when building the pipeline, it's not clear what the fusion
looks like until the pipeline is submitted to a runner, thus
making configuration of the parallelism/worker-per-stage not
straightforward.
Flink's parallelism settings can be found here

<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/>,
it's still kind of a black box since you don't really know how
many tasks are actually spawned until you run a pipeline.

That being said, if we have a general interface controlling how a
pipeline scales, each runner could adapt [auto]scaling in their
own way.
For example, in a Flink job, each operator/stage's task slot is
prorated by their key numbers; the maximum parallelism is
throttled by task slot utilization.
Another example, in a Dataflow job, each stage horizontally scales
by CPU utilization; vertically scales by memory/disk utilization.

+dev@beam.apache.org <mailto:dev@beam.apache.org>
Let's use this thread to discuss how to configure a pipeline for
runners so that they can scale workers appropriately without
exposing runner-specific details to the Beam model.

Ning.


On Thu, Apr 20, 2023 at 1:41 PM Jan Lukavský  wrote:

Hi Ning,

I might have missed that in the discussion, but we talk about
batch execution, am I right? In streaming, all operators
(PTransforms) of a Pipeline are run in the same slots, thus
the downsides are limited. You can enforce streaming mode
using --streaming command-line argument. But yes, this might
have other implications. For batch only it obviously makes
sense to limit parallelism of a (fused) 'stage', which is not
an transform-level concept, but rather a more complex union of
transforms divided by shuffle barrier. Would you be willing to
start a follow-up thread in @dev mailing list for this for
deeper discussion?

 Jan

On 4/20/23 19:18, Ning Kang via user wrote:

Hi Jan,

The approach works when your pipeline doesn't have too many
operators. And the operator that needs the highest
parallelism can only use at most #total_task_slots /
#operators resources available in the cluster.

Another downside is wasted resources for other smaller
operators who cannot make full use of task slots assigned to
them. You might see only 1/10 tasks running while the other
9/10 tasks idle for an operator with parallelism 10,
especially when it's doing some aggregation like a SUM.

One redeeming method is that, for operators following another
operator with high fanout, we can explicitly add a Reshuffle
to allow a higher parallelism. But this circles back to the
first downside: if your pipeline has exponentially high
fanout through it, setting a single parallelism for the whole
pipeline is not ideal because it limits the scalability of
your pipeline significantly.

Ning.


On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský
 wrote:

Hi,

this topic was discussed many years ago and the
conclusion there was that s

Re: [ANNOUNCE] New committer: Anand Inguva

2023-04-21 Thread Jan Lukavský

Congrats Anand!

On 4/21/23 20:05, Robert Burke wrote:

Congratulations Anand!

On Fri, Apr 21, 2023, 10:55 AM Danny McCormick via dev 
 wrote:


Woohoo, congrats Anand! This is very well deserved!

On Fri, Apr 21, 2023 at 1:54 PM Chamikara Jayalath
 wrote:

Hi all,

Please join me and the rest of the Beam PMC in welcoming a new
committer: Anand Inguva (ananding...@apache.org)

Anand has been contributing to Apache Beam for more than a
year and  authored and reviewed more than 100 PRs. Anand has
been a core contributor to Beam Python SDK and drove the
efforts to support Python 3.10 and Python 3.11.

Considering their contributions to the project over this
timeframe, the Beam PMC trusts Anand with the responsibilities
of a Beam committer. [1]

Thank you Anand! And we are looking to see more of your
contributions!

Cham, on behalf of the Apache Beam PMC

[1]

https://beam.apache.org/contribute/become-a-committer/#an-apache-beam-committer




Re: [DISCUSS] @Experimental, @Internal, @Stable, etc annotations

2023-04-03 Thread Jan Lukavský

Hi,

removing @Experimental and adding explicit @Stable annotation makes 
sense to me. FWIW, when we were designing Euphoria API, we adopted the 
following convention:


 - the default stability of "evolving", @Experimental for really 
experimental code [1]


 - target @Audience of API [2] (pipeline author, runner, internal, test)

 - and @StateComplexity of operators (PTransforms) [3]

The last part is something that was planned to be used by tools that can 
analyze the Pipeline for performance or visualize which transform(s) are 
most state-consuming. But this ended only as plans. :)


 Jan

[1] 
https://github.com/apache/beam/blob/master/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/annotation/stability/Experimental.java


[2] 
https://github.com/apache/beam/blob/master/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/annotation/audience/Audience.java


[3] 
https://github.com/apache/beam/blob/master/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/annotation/operator/StateComplexity.java



On 3/31/23 23:05, Kenneth Knowles wrote:

Hi all,

Long ago, we adopted two annotations in Beam to communicate to users:

 - `@Experimental` indicates that an API might change
 - `@Internal` indicates that an API is not meant for users.

I've seen some real problems with this approach:

 - Users are afraid to use `@Experimental` APIs, because they are 
worried they are not production-ready. But it really just means they 
might change, and has nothing to do with that.
 - People write new code and do not put `@Experimental` annotations on 
it, even though it really should be able to change for a while, so we 
can do a good job.
 - I'm seeing a culture of being afraid to change things, even when it 
would be good for users, because our API surface area is far too large 
and not explicitly chosen.
 - `@Internal` is not that well-known. And now we have many target 
audiences: Beam devs, PTransform devs, tool devs, pipeline authors. 
Some of them probably want to use `@Internal` stuff!


I looked at a couple sibling projects and what they have
 - Flink:
 - Spark:

They have many more tags, and some of them seem to have reverse 
defaults to Beam.


Flink: 
https://github.com/apache/flink/tree/master/flink-annotations/src/main/java/org/apache/flink/annotation


 - Experimental
 - Internal.java
 - Public
 - PublicEvolving
 - VisibleForTesting

Spark: 
https://github.com/apache/spark/tree/master/common/tags/src/main/java/org/apache/spark/annotation and 
https://github.com/apache/spark/tree/master/common/tags/src/main/scala/org/apache/spark/annotation


 - AlphaComponent
 - DeveloperApi
 - Evolving
 - Experimental
 - Private
 - Stable
 - Unstable
 - Since

I think it would help users to understand Beam with some simple, 
though possibly large-scale changes. My goal would be:


 - new code is changeable/evolving by default (so we don't have to 
always remember to annotate it) but users have confidence they can use 
it in production (because we have good software engineering practices)

 - Experimental would be reserved for more risky things
 - after we are confident an API is stable, because it has been the 
same across a couple releases, we mark it


A concrete proposal to achieve this would be:

 - Add a @Stable annotation and use it as appropriate on our primary APIs
 - [Possibly] add an @Evolving annotation that would also be the default.
 - Remove most `@Experimental` annotations or change them to `@Evolving`
 - Communicate about this (somehow). If possible, surface the 
`@Evolving` default in documentation.


The last bit is the hardest.

Kenn


Re: [DESIGN] Beam Triggered side input specification

2023-03-29 Thread Jan Lukavský
> Well yes it was (though as mentioned before, the fact that none of 
these designs were even written into the spec is a problem), though in 
some ways not a great one. The only global synchronization method we had 
was the watermark/end of window, so if the source PCollection was 
triggered by something else we lost that.This creates some unfortunate 
situations (in particular I would not recommend using distributed 
Map-valued side inputs with an early trigger - the behavior is probably 
not what one expects). Part of the problem is that triggers themselves 
are non determistic. Something like retractions would make this better 
but not completely. Something better here would be great, but I'm still 
not sure what it would be or if any of our runners could implement it.


Yes, the problem is due to the fact that triggers fire at 
non-deterministic event times, because most of Beam's current triggers 
are either processing time triggers or data-driven triggers. We could 
obtain the same behavior as for the end-of-window trigger with 
event-time triggers (the EOW trigger and GC trigger are AFAIK the only 
event time triggers we currently have). These might be useful on its 
own, but would also require some what more complicated logic in GBK 
(splitting window into panes, holding state for each pane independently, 
merging state for accumulating triggers, ...).


  Jan



On 3/28/23 17:26, Reuven Lax via dev wrote:



On Tue, Mar 28, 2023 at 12:39 AM Jan Lukavský  wrote:


On 3/27/23 19:44, Reuven Lax via dev wrote:



On Mon, Mar 27, 2023 at 5:43 AM Jan Lukavský  wrote:

Hi,

I'd like to clarify my understanding. Side inputs generally
perform a left (outer) join, LHS side is the main input, RHS
is the side input.


Not completely - it's more of what I would call a nested-loop
join. I.e. if the side input changes _nothing happens_ until a
new element arrives on the LHS. This isn't quite the same as a
left-outer join.

+1. This makes sense, my description was a slight simplification.


Doing streaming left join requires watermark synchronization,
thus elements from the main input are buffered until
main_input_timestamp > side_input_watermark. When side input
watermark reaches max watermark, main inputs do not have to
be buffered because the side input will not change anymore.
This works well for cases when the side input is bounded or
in the case of "slowly changing" patterns (ideally with
perfect watermarks, so no late data present).


This is true for non-triggered side inputs. Triggered side inputs
have always been different - the main-input elements are buffered
until the first triggered value of the side input is available.

I walked again through the code in
SimplePushBackSideInputDoFnRunner and looks like this is correct,
the runner actually does not wait for watermark, but for "ready
windows", which implies what you say. With suitable trigger
(AfterWatermark.pastEndOfWindow() this coincides with the
watermark of end of the window.


Allowing arbitrary changes in the side input (with arbitrary
triggers) might introduce additional questions - how to
handle late data in the side input? Full implementation would
require retractions. Dropping late data does not feel like a
solution, because then the pipeline would not converge to the
"correct" solution, as the side input might hold incorrect
value forever. Applying late data from the processing time
the DoFn receives them could make the downstream processing
unstable, restarting the pipeline on errors might change what
is "on time" and what is late thus generate inconsistent
different results.

BTW, triggered side inputs have always been available. The
problem Kenn is addressing is that nobody has ever written down
the spec! There was a spec in mind when they were implemented,
but the fact that this was not written has always been
problematic (and especially so when creating the portable runner).

Triggered side inputs have always had some
non-determinstic behavior, not just for late data. Side inputs
are cached locally on the reader, so different reading workers
might have different views on what the latest trigger was.

Makes sense, is this a design decision? I can imagine that waiting
for side input watermark unconditionally adds latency, on the
other hand an "unexpected" non-deterministic behavior can confuse
users. This type of non-determinism after pipeline failure and
recovery is even the most hard to debug. If we would document (and
possibly slightly reimplement) the triggered side-input spec,
could we add (optional) way to make the processing deterministic
via watermark sync?

Re: [DESIGN] Beam Triggered side input specification

2023-03-28 Thread Jan Lukavský
> Makes sense, is this a design decision? I can imagine that waiting 
for side input watermark unconditionally adds latency, on the other hand 
an "unexpected" non-deterministic behavior can confuse users. This type 
of non-determinism after pipeline failure and recovery is even the most 
hard to debug. If we would document (and possibly slightly reimplement) 
the triggered side-input spec, could we add (optional) way to make the 
processing deterministic via watermark sync?


Ah, I see the problem, triggered elements cannot be "synced" via 
watermark, because they carry timestamp defined by TimestampCombiner. 
Makes sense now, thanks. +1


 Jan

On 3/28/23 09:39, Jan Lukavský wrote:



On 3/27/23 19:44, Reuven Lax via dev wrote:



On Mon, Mar 27, 2023 at 5:43 AM Jan Lukavský  wrote:

Hi,

I'd like to clarify my understanding. Side inputs generally
perform a left (outer) join, LHS side is the main input, RHS is
the side input.


Not completely - it's more of what I would call a nested-loop join. 
I.e. if the side input changes _nothing happens_ until a new element 
arrives on the LHS. This isn't quite the same as a left-outer join.

+1. This makes sense, my description was a slight simplification.


Doing streaming left join requires watermark synchronization,
thus elements from the main input are buffered until
main_input_timestamp > side_input_watermark. When side input
watermark reaches max watermark, main inputs do not have to be
buffered because the side input will not change anymore. This
works well for cases when the side input is bounded or in the
case of "slowly changing" patterns (ideally with perfect
watermarks, so no late data present).


This is true for non-triggered side inputs. Triggered side inputs 
have always been different - the main-input elements are buffered 
until the first triggered value of the side input is available.
I walked again through the code in SimplePushBackSideInputDoFnRunner 
and looks like this is correct, the runner actually does not wait for 
watermark, but for "ready windows", which implies what you say. With 
suitable trigger (AfterWatermark.pastEndOfWindow() this coincides with 
the watermark of end of the window.


Allowing arbitrary changes in the side input (with arbitrary
triggers) might introduce additional questions - how to handle
late data in the side input? Full implementation would require
retractions. Dropping late data does not feel like a solution,
because then the pipeline would not converge to the "correct"
solution, as the side input might hold incorrect value forever.
Applying late data from the processing time the DoFn receives
them could make the downstream processing unstable, restarting
the pipeline on errors might change what is "on time" and what is
late thus generate inconsistent different results.

BTW, triggered side inputs have always been available. The problem 
Kenn is addressing is that nobody has ever written down the spec! 
There was a spec in mind when they were implemented, but the fact 
that this was not written has always been problematic (and especially 
so when creating the portable runner).


Triggered side inputs have always had some non-determinstic behavior, 
not just for late data. Side inputs are cached locally on the reader, 
so different reading workers might have different views on what the 
latest trigger was.
Makes sense, is this a design decision? I can imagine that waiting for 
side input watermark unconditionally adds latency, on the other hand 
an "unexpected" non-deterministic behavior can confuse users. This 
type of non-determinism after pipeline failure and recovery is even 
the most hard to debug. If we would document (and possibly slightly 
reimplement) the triggered side-input spec, could we add (optional) 
way to make the processing deterministic via watermark sync?


It seems safe to process multiple triggers as long as the trigger
does not produce late data, though (i.e. early emitting).
Processing possibly late data might requires to buffer main input
up while main_input_timestamp > side_input_watermark -
allowed_lateness.

Is my line of thinking correct?

 Jan

On 3/23/23 20:19, Kenneth Knowles wrote:

Hi all,

I had a great chat with +Reza Rokni
<mailto:rezaro...@google.com> and +Reuven Lax
<mailto:re...@google.com> yesterday about some inconsistencies
in side input behavior, both before and after portability was
introduced.

I wrote up my thoughts about how we should specify the semantics
and implement them:

https://s.apache.org/beam-triggered-side-inputs

I think I found some issues that I think may require changes in
the portability protocols to get consistent behavior.

Please take a look and find my errors and oversights!

Kenn


Re: [DESIGN] Beam Triggered side input specification

2023-03-28 Thread Jan Lukavský


On 3/27/23 19:44, Reuven Lax via dev wrote:



On Mon, Mar 27, 2023 at 5:43 AM Jan Lukavský  wrote:

Hi,

I'd like to clarify my understanding. Side inputs generally
perform a left (outer) join, LHS side is the main input, RHS is
the side input.


Not completely - it's more of what I would call a nested-loop join. 
I.e. if the side input changes _nothing happens_ until a new element 
arrives on the LHS. This isn't quite the same as a left-outer join.

+1. This makes sense, my description was a slight simplification.


Doing streaming left join requires watermark synchronization, thus
elements from the main input are buffered until
main_input_timestamp > side_input_watermark. When side input
watermark reaches max watermark, main inputs do not have to be
buffered because the side input will not change anymore. This
works well for cases when the side input is bounded or in the case
of "slowly changing" patterns (ideally with perfect watermarks, so
no late data present).


This is true for non-triggered side inputs. Triggered side inputs have 
always been different - the main-input elements are buffered until the 
first triggered value of the side input is available.
I walked again through the code in SimplePushBackSideInputDoFnRunner and 
looks like this is correct, the runner actually does not wait for 
watermark, but for "ready windows", which implies what you say. With 
suitable trigger (AfterWatermark.pastEndOfWindow() this coincides with 
the watermark of end of the window.


Allowing arbitrary changes in the side input (with arbitrary
triggers) might introduce additional questions - how to handle
late data in the side input? Full implementation would require
retractions. Dropping late data does not feel like a solution,
because then the pipeline would not converge to the "correct"
solution, as the side input might hold incorrect value forever.
Applying late data from the processing time the DoFn receives them
could make the downstream processing unstable, restarting the
pipeline on errors might change what is "on time" and what is late
thus generate inconsistent different results.

BTW, triggered side inputs have always been available. The problem 
Kenn is addressing is that nobody has ever written down the spec! 
There was a spec in mind when they were implemented, but the fact that 
this was not written has always been problematic (and especially so 
when creating the portable runner).


Triggered side inputs have always had some non-determinstic behavior, 
not just for late data. Side inputs are cached locally on the reader, 
so different reading workers might have different views on what the 
latest trigger was.
Makes sense, is this a design decision? I can imagine that waiting for 
side input watermark unconditionally adds latency, on the other hand an 
"unexpected" non-deterministic behavior can confuse users. This type of 
non-determinism after pipeline failure and recovery is even the most 
hard to debug. If we would document (and possibly slightly reimplement) 
the triggered side-input spec, could we add (optional) way to make the 
processing deterministic via watermark sync?


It seems safe to process multiple triggers as long as the trigger
does not produce late data, though (i.e. early emitting).
Processing possibly late data might requires to buffer main input
up while main_input_timestamp > side_input_watermark -
allowed_lateness.

Is my line of thinking correct?

 Jan

On 3/23/23 20:19, Kenneth Knowles wrote:

Hi all,

I had a great chat with +Reza Rokni
<mailto:rezaro...@google.com> and +Reuven Lax
<mailto:re...@google.com> yesterday about some inconsistencies in
side input behavior, both before and after portability was
introduced.

I wrote up my thoughts about how we should specify the semantics
and implement them:

https://s.apache.org/beam-triggered-side-inputs

I think I found some issues that I think may require changes in
the portability protocols to get consistent behavior.

Please take a look and find my errors and oversights!

Kenn


Re: [DESIGN] Beam Triggered side input specification

2023-03-27 Thread Jan Lukavský

Hi,

I'd like to clarify my understanding. Side inputs generally perform a 
left (outer) join, LHS side is the main input, RHS is the side input. 
Doing streaming left join requires watermark synchronization, thus 
elements from the main input are buffered until main_input_timestamp > 
side_input_watermark. When side input watermark reaches max watermark, 
main inputs do not have to be buffered because the side input will not 
change anymore. This works well for cases when the side input is bounded 
or in the case of "slowly changing" patterns (ideally with perfect 
watermarks, so no late data present).


Allowing arbitrary changes in the side input (with arbitrary triggers) 
might introduce additional questions - how to handle late data in the 
side input? Full implementation would require retractions. Dropping late 
data does not feel like a solution, because then the pipeline would not 
converge to the "correct" solution, as the side input might hold 
incorrect value forever. Applying late data from the processing time the 
DoFn receives them could make the downstream processing unstable, 
restarting the pipeline on errors might change what is "on time" and 
what is late thus generate inconsistent different results.


It seems safe to process multiple triggers as long as the trigger does 
not produce late data, though (i.e. early emitting). Processing possibly 
late data might requires to buffer main input up while 
main_input_timestamp > side_input_watermark - allowed_lateness.


Is my line of thinking correct?

 Jan

On 3/23/23 20:19, Kenneth Knowles wrote:

Hi all,

I had a great chat with +Reza Rokni  and 
+Reuven Lax  yesterday about some 
inconsistencies in side input behavior, both before and after 
portability was introduced.


I wrote up my thoughts about how we should specify the semantics and 
implement them:


https://s.apache.org/beam-triggered-side-inputs

I think I found some issues that I think may require changes in the 
portability protocols to get consistent behavior.


Please take a look and find my errors and oversights!

Kenn

Re: [VOTE] Release 2.46.0, release candidate #1

2023-03-08 Thread Jan Lukavský

+1 (binding)

Tested Java SDK with Flink and Spark 3 runner.

Thanks,

 Jan

On 3/8/23 01:53, Valentyn Tymofieiev via dev wrote:
+1. Verified the composition of Python containers and ran Python 
pipelines on Dataflow runner v1 and runner v2.


On Tue, Mar 7, 2023 at 4:11 PM Ritesh Ghorse via dev 
 wrote:


+1 (non-binding)
Validated Go SDK quickstart on direct and dataflow runner

On Tue, Mar 7, 2023 at 10:54 AM Alexey Romanenko
 wrote:

+1 (binding)

Tested with https://github.com/Talend/beam-samples/
(Java SDK v8/v11/v17, Spark 3.x runner).

---
Alexey


On 7 Mar 2023, at 07:38, Ahmet Altay via dev
 wrote:

+1 (binding) - I validated python quickstarts on direct &
dataflow runners.

Thank you for doing the release!

On Sat, Mar 4, 2023 at 8:01 AM Chamikara Jayalath via dev
 wrote:

+1 (binding)

Validated multi-language Java and Python pipelines.

On Fri, Mar 3, 2023 at 1:59 PM Danny McCormick via dev
 wrote:

> I have encountered a failure in a Python pipeline
running with Runner v1:

> RuntimeError: Beam SDK base version 2.46.0 does not
match Dataflow Python worker version 2.45.0. Please
check Dataflow worker startup logs and make sure that
correct version of Beam SDK is installed.

> We should understand why Python ValidatesRunner
tests (which have passed)  didn't catch this error.

> This can be remediated in Dataflow containers
without  changes to the release candidate.

Good catch! I've kicked off a release to fix this, it
should be done later this evening - I won't be
available when it completes, but I would expect it to
be around 5:00 PST.

On Fri, Mar 3, 2023 at 3:49 PM Danny McCormick
 wrote:

Hey Reuven, could you provide some more context
on the bug/why it is important? Does it meet the
standard in

https://beam.apache.org/contribute/release-guide/#7-triage-release-blocking-issues-in-github?


The release branch was cut last Wednesday, so
that is why it is not included.


Seems like this was a revert of a previous commit that
was also not included in the 2.46.0 release branch
(https://github.com/apache/beam/pull/25627) ?

If so we might not need a new RC but good to confirm.

Thanks,
Cham


On Fri, Mar 3, 2023 at 3:24 PM Reuven Lax
 wrote:

If possible, I would like to see if we could
include
https://github.com/apache/beam/pull/25642 as
we believe this bug has been impacting
multiple users. This was merged 4 days ago,
but this RC cut does not seem to include it.

On Fri, Mar 3, 2023 at 12:18 PM Valentyn
Tymofieiev via dev  wrote:

I have encountered a failure in a Python
pipeline running with Runner v1:

RuntimeError: Beam SDK base version
2.46.0 does not match Dataflow Python
worker version 2.45.0. Please check
Dataflow worker startup logs and make
sure that correct version of Beam SDK is
installed.

We should understand why Python
ValidatesRunner tests (which have passed)
didn't catch this error.

This can be remediated in Dataflow
containers without changes to the release
candidate.

On Fri, Mar 3, 2023 at 11:22 AM Robert
Bradshaw via dev  wrote:

+1 (binding).

I verified that the artifacts and
signatures all look good, all the
containers are pushed, and tested
some pipelines with a fresh install
from one of the Python wheels.

On Fri, Mar 3, 2023 at 11:13 AM Danny
McCormick
 wrote:
>
> > The released artifacts seem to be
missing the last commit 

Re: Consuming one PCollection before consuming another with Beam

2023-02-28 Thread Jan Lukavský
In the case that the data is too large for side input, you could do the 
same by reassigning timestamps of the BQ input to 
BoundedWindow.TIMESTAMP_MIN_VALUE (you would have to do that in a 
stateful DoFn with a timer having outputTimestamp set to 
TIMESTAMP_MIN_VALUE to hold watermark, or using splittable DoFn, or if 
BQ allows you to specify timestamp function use that directly).


In your BusinessLogic() you would set timer that would wait for 
watermark move (e.g. timer.offset(1).setRelative()) and buffer 
everything until the timer fires. Because the input from BQ is bounded, 
it will eventually advance to TIMESTAMP_MAX_VALUE which will fire the 
timer and flush the buffer.


I think this pattern might be useful on its own, so if you decided to 
implement it, it might be good to incorporate it into the core 
transforms (we already have Wait.on() which is somewhat similar). I can 
imagine a mini-workflow, that would take a bounded and unbounded 
PCollection, a DoFn and a function to be applied on the DoFn first for 
elements of the bounded PCollection and only after that start processing 
the unbounded one.


 Jan

On 2/27/23 20:59, Reuven Lax via dev wrote:
How large is this state spec stored in BQ? If the size isn't too 
large, you can read it from BQ and make it a side input into the DoFn.


On Mon, Feb 27, 2023 at 11:06 AM Sahil Modak 
 wrote:


We are trying to re-initialize our state specs in the
BusinessLogic() DoFn from BQ.
BQ has data about the state spec, and we would like to make sure
that the state specs in our BusinessLogic() dofn are initialized
before it starts consuming the pub/sub.

This is for handling the case of redeployment of the dataflow jobs
so that the states are preserved and the BusinessLogic() can work
seamlessly as it was previously. All our dofns are operating in a
global window and do not perform any aggregation.

We are currently using Redis to preserve the state spec
information but would like to explore using BQ as an alternative
to Redis.

On Fri, Feb 24, 2023 at 12:51 PM Kenneth Knowles 
wrote:

My suggestion is to try to solve the problem in terms of what
you want to compute. Instead of trying to control the
operational aspects like "read all the BQ before reading
Pubsub" there is presumably some reason that the BQ data
naturally "comes first", for example if its timestamps are
earlier or if there is a join or an aggregation that must
include it. Whenever you think you want to set up an
operational dependency between two things that "happen" in a
pipeline, it is often best to pivot your thinking to the data
and what you are trying to compute, and the built-in
dependencies will solve the ordering problems.

So - is there a way to describe your problem in terms of the
data and what you are trying to compute?

Kenn

On Fri, Feb 24, 2023 at 10:46 AM Reuven Lax via dev
 wrote:

First PCollections are completely unordered, so there is
no guarantee on what order you'll see events in the
flattened PCollection.

There may be ways to process the BigQuery data in a
separate transform first, but it depends on the structure
of the data. How large is the BigQuery table? Are you
doing any windowed aggregations here?

Reuven

On Fri, Feb 24, 2023 at 10:40 AM Sahil Modak
 wrote:

Yes, this is a streaming pipeline.

Some more details about existing implementation v/s
what we want to achieve.

Current implementation:
Reading from pub-sub:

Pipeline input =Pipeline.create(options);

PCollection pubsubStream = input.apply("Read From 
Pubsub",PubsubIO.readMessagesWithAttributesAndMessageId()

.fromSubscription(inputSubscriptionId))

Reading from bigquery:

PCollection bqStream = input.apply("Read from 
BQ",BigQueryIO .readTableRows().fromQuery(bqQuery).usingStandardSql())

.apply("JSon Transform", AsJsons.of(TableRow.class));

Merge the inputs:

PCollection mergedInput 
=PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input", 
Flatten.pCollections());

Business Logic:

mergedInput.apply("Business Logic", ParDo.of(new
BusinessLogic()))

Above logic is what we use currently in our pipeline.

We want to make sure that we read from BigQuery first
& pass the bqStream through our BusinessLogic() before
we start consuming pubsubStream.

Is there a way to achieve this?

Thanks,

  

Re: [ANNOUNCE] New PMC Member: Jan Lukavský

2023-02-17 Thread Jan Lukavský

Thanks everyone!

This is great honor, I'm grateful for the support of the Apache Beam 
community.


Best,

 Jan

On 2/17/23 11:15, Shivam Singhal wrote:

Congratulations Jan!

On Fri, 17 Feb 2023 at 14:26, Moritz Mack  wrote:

Congrats, Jan!

On 16.02.23, 23:28, "Luke Cwik via dev"  wrote:

Congrats, well deserved. On Thu, Feb 16, 2023 at 10: 32 AM Anand
Inguva via dev  wrote: Congratulations!!
On Thu, Feb 16, 2023 at 12: 42 PM Chamikara Jayalath via dev
 wrote: Congrats Jan!On

Congrats, well deserved.

On Thu, Feb 16, 2023 at 10:32 AM Anand Inguva via dev
 wrote:

Congratulations!!

On Thu, Feb 16, 2023 at 12:42 PM Chamikara Jayalath via dev
 wrote:

Congrats Jan!

On Thu, Feb 16, 2023 at 8:35 AM John Casey via dev
 wrote:

Thanks Jan!

On Thu, Feb 16, 2023 at 11:11 AM Danny McCormick via
dev  wrote:

Congratulations!

On Thu, Feb 16, 2023 at 11:09 AM Reza Rokni via
dev  wrote:

Congratulations!

On Thu, Feb 16, 2023 at 7:47 AM Robert Burke
 wrote:

Congratulations!

On Thu, Feb 16, 2023, 7:44 AM Danielle
Syse via dev  wrote:

Congrats, Jan! That's awesome news.
Thank you for your continued
contributions!

On Thu, Feb 16, 2023 at 10:42 AM
Alexey Romanenko
 wrote:

Hi all,

Please join me and the rest of the
Beam PMC in welcoming Jan Lukavský
 as our newest
PMC member.

Jan has been a part of Beam
community and a long time
contributor since 2018 in many
significant ways, including code
contributions in different areas,
participating in technical
discussions, advocating for users,
giving a talk at Beam Summit and
even writing one of the few Beam
books!

Congratulations Jan and thanks for
being a part of Apache Beam!

---
Alexey


  *As a recipient of an email from the Talend Group, your personal
  data will be processed by our systems. Please see our Privacy
  Notice <https://www.talend.com/privacy-policy/>*for more
  information about our collection and use of your personal
  information, our security practices, and your data protection
  rights, including any rights you may have to object to
  automated-decision making or profiling we use to analyze support
  or marketing related communications. To manage or discontinue
  promotional communications, use the communication preferences
  portal <https://info.talend.com/emailpreferencesen.html>. To
  exercise your data protection rights, use the privacy request
  form
  
<https://talend.my.onetrust.com/webform/ef906c5a-de41-4ea0-ba73-96c079cdd15a/b191c71d-f3cb-4a42-9815-0c3ca021704cl>.
  Contact us here <https://www.talend.com/contact/>or by mail to
  either of our co-headquarters: Talend, Inc.: 400 South El Camino
  Real, Ste 1400, San Mateo, CA 94402; Talend SAS: 5/7 rue Salomon
  De Rothschild, 92150 Suresnes, France


Re: [VOTE] Release 2.44.0, release candidate #1

2023-01-11 Thread Jan Lukavský

+1 (non-binding)

Tested Java SDK with Flink runner.

Thanks,

 Jan

On 1/11/23 17:27, Bruno Volpato via dev wrote:

+1 (non-binding)

Tested with 
https://github.com/GoogleCloudPlatform/DataflowTemplates (Java SDK 11, 
Dataflow runner).



Thanks!

On Wed, Jan 11, 2023 at 11:08 AM Alexey Romanenko 
 wrote:


+1 (binding)

Tested with https://github.com/Talend/beam-samples/
(Java SDK v8/v11/v17, Spark 3 runner).

---
Alexey


On 11 Jan 2023, at 16:53, Ritesh Ghorse via dev
 wrote:

+1 (non-binding)
Validated Go Dataframe Transform wrapper on Dataflow runner and
Go SDK quickstart on Direct and Dataflow Runner.

Thanks!

On Wed, Jan 11, 2023 at 12:51 AM Anand Inguva via dev
 wrote:

I ran the Python word count on DirectRunner and Dataflow Runner.

Steps:
1. pip install --pre apache_beam in a fresh virtualenv.
2. Run the command Ahmet provided except removing the
sdk_location from CMD args.

The job was successful. 

On Tue, Jan 10, 2023 at 6:48 PM Ahmet Altay via dev
 wrote:

I validated python quick starts (direct, dataflow) X
(batch, streaming). I ran into an issue with the dataflow
batch case, running the wordcount with the standard:

python -m apache_beam.examples.wordcount \
--output  \
--staging_location  \
--temp_location \
--runner DataflowRunner \
--job_name wordcount-$USER \
--project  \
--num_workers 1 \
--region us-central1 \
--sdk_location apache-beam-2.44.0.zip

results in:


"/usr/local/lib/python3.10/site-packages/dataflow_worker/shuffle.py",
line 589, in __enter__ raise
RuntimeError(_PYTHON_310_SHUFFLE_ERROR_MESSAGE)
RuntimeError: This pipeline requires Dataflow Runner v2
in order to run with currently used version of Apache
Beam on Python 3.10+. Please verify that the Dataflow
Runner v2 is not disabled in the pipeline options or
enable it explicitly via:
--dataflow_service_option=use_runner_v2. Alternatively,
downgrade to Python 3.9 to use Dataflow Runner v1.

Questions:
- I am not explicitly opting out of runner v2, and this
is a standard wordcount example, I expected it to just work.

Then I tried to
add --dataflow_service_option=use_runner_v2 to the above
wordcount command, which results in the following error:

    "message": "Dataflow Runner v2 requires a valid FnApi
job, Please resubmit your job with a valid configuration.
Note that if using Templates, you may need to regenerate
your template with the '--use_runner_v2'."

Maybe I am doing something wrong and it is an error on my
end. It would be good for someone else with python
experience to check this.

/cc @Valentyn Tymofieiev 

Ahmet




On Tue, Jan 10, 2023 at 10:54 AM Kenneth Knowles
 wrote:

I have published a new maven staging repository:

https://repository.apache.org/content/repositories/orgapachebeam-1290/


It looks like it has everything, though I did not
automate a check. At least there were no errors
during publish which I ran with --no-parallel
overnight, and some specific things that were missing
from orgapachebeam-1289 are present.

I will restart the 72 hour waiting period, since the
RC is only now usable.

Kenn

On Mon, Jan 9, 2023 at 6:51 PM Kenneth Knowles
 wrote:

I have discovered that many pom files are missing
from the nexus repository. I should be able to
re-publish a new one. It will take some time as
this is one of the longest-running processes.

On Mon, Jan 9, 2023 at 1:42 PM Kenneth Knowles
 wrote:

Correction: this is release candidate #1.

On Mon, Jan 9, 2023 at 1:25 PM Kenneth
Knowles  wrote:

Hi everyone,

Please review and vote on the release
candidate #3 for the version 2.44.0, as
follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release
(please provide specific comments)

Reviewers are encouraged to test their
own use cases with the 

Re: SparkRunner - ensure SDF output does not need to fit in memory

2023-01-03 Thread Jan Lukavský
Yes, I think we can definitely start with spark-specific config option, 
but there could be value for other runners to know if output of 
@ProcessElement is somewhat limited in size (e.g. can be can be included 
in single bundle), or needs to be actively split. This could then be 
incorporated into the naive bounded implementation that is reused by 
multiple runners [1], which currently does not do any (active) splits of 
running restriction. But this might be a different discussion.


 Jan

[1]https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java#L111

On 1/3/23 11:38, Jozef Vilcek wrote:
Regarding splitting, I think SDF is being split on spark runner, but I 
agree with Jan's comments about split's contract. Specific SDF is also 
free to make decisions about how big the minimal split will be and the 
runner should be able to process that with reasonable resources. E.g. 
ParquetIO is splitting on format's row groups. If the row group is 
larger and format contains a lot of well compressed column data, it 
will challenge memory resources.


Jan, as for suggested options to implement it, I have an MR with 
approach 1) to translate all SDFs to two-threaded executions. I did 
consider something like option 3) but I was not sure if it makes sense 
in general for other runners as well for Spark. It begs a question for 
me if it ever makes sense to create SDF and want it on Spark not to 
use 2 thread execution and possibly apply memory pressure.


On Mon, Jan 2, 2023 at 4:49 PM Jan Lukavský  wrote:

There are different translations of streaming and batch Pipelines
in SparkRunner, this thread was focused on the batch part, if I
understand it correctly. Unbounded PCollections are not supported
in batch Spark (by definition). I agree that fixing the splitting
is a valid option, though it still requires unnecessarily big heap
for buffering and/or might induce some overhead with splitting the
restriction. Not to mention, that the splitting is somewhat
optional in the contract of SDF (the DoFn might not support it, if
it is bounded), so it might not solve the issue for all SDFs. The
source might not even be splittable at all (e.g. a completely
compressed blob, without any blocks).

 Jan

On 1/2/23 16:22, Daniel Collins via dev wrote:

If spark's SDF solution doesn't support splitting, fixing that
seems like the best solution to me. Splitting is the mechanism
exposed by the model to actually limit the amount of data
produced in a bundle. If unsupported, then unbounded-per-element
SDFs wouldn't be supported at all.

-Daniel

On Mon, Jan 2, 2023 at 7:46 AM Jan Lukavský  wrote:

Hi Jozef,

I agree that this issue is most likely related to Spark for
the reason how Spark uses functional style for doing flatMap().

It could be fixed with the following two options:

 a) SparkRunner's SDF implementation does not use splitting -
it could be fixed so that the SDF is stopped after N elements
buffered via trySplit, buffer gets flushed and the
restriction is resumed

 b) alternatively use two threads and a BlockingQueue between
them, which is what you propose

The number of output elements per input element is bounded
(we are talking about batch case anyway), but bounded does
not mean it has to fit to memory. Furthermore, unnecessary
buffering of large number of elements is memory-inefficient,
which is why I think that the two-thread approach (b) should
be the most efficient. The option (a) seems orthogonal and
might be implemented as well.

It rises the question of how to determine if the runner
should do some special translation of SDF in this case. There
are probably only these options:

 1) translate all SDFs to two-thread execution

 2) add runtime flag, that will turn the translation on (once
turned on, it will translate all SDFs) - this is the current
proposal

 3) extend @DoFn.BoundedPerElement annotation with some kind
of (optional) hint - e.g.
@DoFn.BoundedPerElement(Bounded.POSSIBLY_HUGE), the default
would be Bounded.FITS_IN_MEMORY (which is the current approach)

The approach (3) seems to give more information to all
runners and might result in the ability to apply various
optimizations for multiple runners, so I'd say that this
might be the ideal variant.

  Jan

On 12/29/22 13:07, Jozef Vilcek wrote:

I am surprised to hear that Dataflow runner ( which I never
used ) would have this kind oflimitation. I see that the
`OutputManager` interface is implemented to write to
`Receiver` [1] which follows the push model. Do you have

Re: SparkRunner - ensure SDF output does not need to fit in memory

2023-01-02 Thread Jan Lukavský
There are different translations of streaming and batch Pipelines in 
SparkRunner, this thread was focused on the batch part, if I understand 
it correctly. Unbounded PCollections are not supported in batch Spark 
(by definition). I agree that fixing the splitting is a valid option, 
though it still requires unnecessarily big heap for buffering and/or 
might induce some overhead with splitting the restriction. Not to 
mention, that the splitting is somewhat optional in the contract of SDF 
(the DoFn might not support it, if it is bounded), so it might not solve 
the issue for all SDFs. The source might not even be splittable at all 
(e.g. a completely compressed blob, without any blocks).


 Jan

On 1/2/23 16:22, Daniel Collins via dev wrote:
If spark's SDF solution doesn't support splitting, fixing that seems 
like the best solution to me. Splitting is the mechanism exposed by 
the model to actually limit the amount of data produced in a bundle. 
If unsupported, then unbounded-per-element SDFs wouldn't be supported 
at all.


-Daniel

On Mon, Jan 2, 2023 at 7:46 AM Jan Lukavský  wrote:

Hi Jozef,

I agree that this issue is most likely related to Spark for the
reason how Spark uses functional style for doing flatMap().

It could be fixed with the following two options:

 a) SparkRunner's SDF implementation does not use splitting - it
could be fixed so that the SDF is stopped after N elements
buffered via trySplit, buffer gets flushed and the restriction is
resumed

 b) alternatively use two threads and a BlockingQueue between
them, which is what you propose

The number of output elements per input element is bounded (we are
talking about batch case anyway), but bounded does not mean it has
to fit to memory. Furthermore, unnecessary buffering of large
number of elements is memory-inefficient, which is why I think
that the two-thread approach (b) should be the most efficient. The
option (a) seems orthogonal and might be implemented as well.

It rises the question of how to determine if the runner should do
some special translation of SDF in this case. There are probably
only these options:

 1) translate all SDFs to two-thread execution

 2) add runtime flag, that will turn the translation on (once
turned on, it will translate all SDFs) - this is the current proposal

 3) extend @DoFn.BoundedPerElement annotation with some kind of
(optional) hint - e.g.
@DoFn.BoundedPerElement(Bounded.POSSIBLY_HUGE), the default would
be Bounded.FITS_IN_MEMORY (which is the current approach)

The approach (3) seems to give more information to all runners and
might result in the ability to apply various optimizations for
multiple runners, so I'd say that this might be the ideal variant.

  Jan

On 12/29/22 13:07, Jozef Vilcek wrote:

I am surprised to hear that Dataflow runner ( which I never used
) would have this kind oflimitation. I see that the
`OutputManager` interface is implemented to write to `Receiver`
[1] which follows the push model. Do you have a reference I can
take a look to review the must fit memory limitation?

In Spark, the problem is that the leaf operator pulls data from
previous ones by consuming an `Iterator` of values. As per your
suggestion, this is not a problem with `sources` because they
hold e.g. source file and can pull data as they are being
requested. This gets problematic exactly with SDF and flatMaps
and not sources. It could be one of the reasons why SDF performed
badly on Spark where community reported performance degradation
[2] and increases memory use [3]

My proposed solution is to, similar as Dataflow, use
`Receiver`-like implementation for DoFns which can output large
number of elements. For now, this WIP targets SDFs only.

[1]

https://github.com/apache/beam/blob/v2.43.0/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java#L285
[2] https://github.com/apache/beam/pull/14755
[3]

https://issues.apache.org/jira/browse/BEAM-10670?focusedCommentId=17332005=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17332005

<https://issues.apache.org/jira/browse/BEAM-10670?focusedCommentId=17332005=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17332005>

On Wed, Dec 28, 2022 at 8:52 PM Daniel Collins via dev
 wrote:

I believe that for dataflow runner, the result of
processElement must also fit in memory, so this is not just a
constraint for the spark runner.

The best approach at present might be to convert the source
from a flatMap to an SDF that reads out chunks of the file at
a time, and supports runner checkpointing (i.e. with a file
seek point to resume from) to chunk your data in a way that
d

Re: SparkRunner - ensure SDF output does not need to fit in memory

2023-01-02 Thread Jan Lukavský

Hi Jozef,

I agree that this issue is most likely related to Spark for the reason 
how Spark uses functional style for doing flatMap().


It could be fixed with the following two options:

 a) SparkRunner's SDF implementation does not use splitting - it could 
be fixed so that the SDF is stopped after N elements buffered via 
trySplit, buffer gets flushed and the restriction is resumed


 b) alternatively use two threads and a BlockingQueue between them, 
which is what you propose


The number of output elements per input element is bounded (we are 
talking about batch case anyway), but bounded does not mean it has to 
fit to memory. Furthermore, unnecessary buffering of large number of 
elements is memory-inefficient, which is why I think that the two-thread 
approach (b) should be the most efficient. The option (a) seems 
orthogonal and might be implemented as well.


It rises the question of how to determine if the runner should do some 
special translation of SDF in this case. There are probably only these 
options:


 1) translate all SDFs to two-thread execution

 2) add runtime flag, that will turn the translation on (once turned 
on, it will translate all SDFs) - this is the current proposal


 3) extend @DoFn.BoundedPerElement annotation with some kind of 
(optional) hint - e.g. @DoFn.BoundedPerElement(Bounded.POSSIBLY_HUGE), 
the default would be Bounded.FITS_IN_MEMORY (which is the current approach)


The approach (3) seems to give more information to all runners and might 
result in the ability to apply various optimizations for multiple 
runners, so I'd say that this might be the ideal variant.


  Jan

On 12/29/22 13:07, Jozef Vilcek wrote:
I am surprised to hear that Dataflow runner ( which I never used ) 
would have this kind oflimitation. I see that the `OutputManager` 
interface is implemented to write to `Receiver` [1] which follows the 
push model. Do you have a reference I can take a look to review the 
must fit memory limitation?


In Spark, the problem is that the leaf operator pulls data from 
previous ones by consuming an `Iterator` of values. As per your 
suggestion, this is not a problem with `sources` because they hold 
e.g. source file and can pull data as they are being requested. This 
gets problematic exactly with SDF and flatMaps and not sources. It 
could be one of the reasons why SDF performed badly on Spark where 
community reported performance degradation [2] and increases memory 
use [3]


My proposed solution is to, similar as Dataflow, use `Receiver`-like 
implementation for DoFns which can output large number of elements. 
For now, this WIP targets SDFs only.


[1] 
https://github.com/apache/beam/blob/v2.43.0/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java#L285

[2] https://github.com/apache/beam/pull/14755
[3] 
https://issues.apache.org/jira/browse/BEAM-10670?focusedCommentId=17332005=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17332005 



On Wed, Dec 28, 2022 at 8:52 PM Daniel Collins via dev 
 wrote:


I believe that for dataflow runner, the result of processElement
must also fit in memory, so this is not just a constraint for the
spark runner.

The best approach at present might be to convert the source from a
flatMap to an SDF that reads out chunks of the file at a time, and
supports runner checkpointing (i.e. with a file seek point to
resume from) to chunk your data in a way that doesn't require the
runner to support unbounded outputs from any individual
@ProcessElements downcall.

-Daniel

On Wed, Dec 28, 2022 at 1:36 PM Jozef Vilcek
 wrote:

Hello,

I am working on an issue which currently limits spark runner
by requiring the result of processElement to fit the memory
[1]. This is problematic e.g for flatMap where the input
element is file split and generates possibly large output.

The intended fix is to add an option to have dofn processing
over input in one thread and consumption of outputs and
forwarding them to downstream operators in another thread. One
challenge for me is to identify which DoFn should be using
this async approach.

Here [2] is a commit which is WIP and use async processing
only for SDF naive expansion. I would like to get feedback on:

1) does the approach make sense overall

2) to target DoFn which needs an async processing __ generates
possibly large output __ I am currently just checking if it is
DoFn of SDF naive expansion type [3]. I failed to find a
better / more systematic approach for identifying which DoFn
should benefit from that. I would appreciate any thoughts how
to make 

Re: @RequiresStableInput and Pipeline fusion

2022-12-14 Thread Jan Lukavský

Filled https://github.com/apache/beam/issues/24655.

 Jan

On 12/14/22 00:52, Luke Cwik via dev wrote:
This is definitely not working for portable pipelines since the 
GreedyPipelineFuser doesn't create a fusion boundary which as you 
pointed out causes a single stage that has a non-deterministic 
function followed by one that requires stable input. It seems as 
though we should have runners check the requirements on the 
Pipeline[1] to ensure that they can faithfully process such a pipeline 
and reject anything they don't support early on.


Making the GreedyPipelineFuser insert that fusion break is likely the 
way to go. Runners should be able to look at the ParDoPayload 
requires_stable_input field for the ExecutableStage to see if any 
special handling is necessary on their end before they pass data to 
that stage.


[1]: 
https://github.com/apache/beam/blob/77af3237521d94f0399ab405ebac09bbbeded38c/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L111 



On Tue, Dec 13, 2022 at 1:44 AM Jan Lukavský  wrote:

Hi,

I have a question about @RequiresStableInput functionality. I'm
trying to make it work for portable Flink runner [1], [2]. We have
an integration test (which should probably be turned into
Validates runner test, but that is a different story) [3]. The
test creates random key for input element, processes it once,
fails the pipeline and then reprocesses it. This works well
provided there is a checkpoint (shuffle in case of dataflow)
exactly between assigning random key (via PairWithRandomKeyFn) and
processing it with (via MakeSideEffectAndThenFailFn), this works well.

The problem is that GreedyPipelineFuser fuses the transform
PairWithRandomKeyFn andMakeSideEffectAndThenFailFn into single
ExecutableStage. This is then executed with the
@RequiresStableInput requirement, but this obviously assigns a
different key to the reprocessed element(s). This looks like we
need to fix that in the PipelineFuser, is this right? Does this
mean the @RequiresStableInput functionality is actually broken for
all runners that use the default fusion?

Another possibility is that we need to fix test by adding an
explicit reshuffle (verified, this works), but I think that the
test is actually correct, users would probably not expect
transforms to be fused when crossing the @RequiresStableInput
boundary.

Thoughts?

 Jan


[1] https://github.com/apache/beam/issues/20812
[2] https://github.com/apache/beam/pull/22889
[3]

https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/RequiresStableInputIT.java


@RequiresStableInput and Pipeline fusion

2022-12-13 Thread Jan Lukavský

Hi,

I have a question about @RequiresStableInput functionality. I'm trying 
to make it work for portable Flink runner [1], [2]. We have an 
integration test (which should probably be turned into Validates runner 
test, but that is a different story) [3]. The test creates random key 
for input element, processes it once, fails the pipeline and then 
reprocesses it. This works well provided there is a checkpoint (shuffle 
in case of dataflow) exactly between assigning random key (via 
PairWithRandomKeyFn) and processing it with (via 
MakeSideEffectAndThenFailFn), this works well.


The problem is that GreedyPipelineFuser fuses the transform 
PairWithRandomKeyFn andMakeSideEffectAndThenFailFn into single 
ExecutableStage. This is then executed with the @RequiresStableInput 
requirement, but this obviously assigns a different key to the 
reprocessed element(s). This looks like we need to fix that in the 
PipelineFuser, is this right? Does this mean the @RequiresStableInput 
functionality is actually broken for all runners that use the default 
fusion?


Another possibility is that we need to fix test by adding an explicit 
reshuffle (verified, this works), but I think that the test is actually 
correct, users would probably not expect transforms to be fused when 
crossing the @RequiresStableInput boundary.


Thoughts?

 Jan


[1] https://github.com/apache/beam/issues/20812
[2] https://github.com/apache/beam/pull/22889
[3] 
https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/RequiresStableInputIT.java


Re: Questions on primitive transforms hierarchy

2022-11-14 Thread Jan Lukavský

I don't think it is necessary in this particular case.

In general, it would be nice to document design decisions that were made 
during the history of Beam and which let to some aspects of the current 
implementation. But I'm afraid it would be rather costly and time 
consuming. We have design docs, which should be fine for most cases.


 Jan

On 11/14/22 15:25, Sachin Agarwal via dev wrote:

Would it be helpful to add these answers to the Beam docs?

On Mon, Nov 14, 2022 at 4:35 AM Jan Lukavský  wrote:

I somehow missed these answers, Reuven and Kenn, thanks for the
discussion, it helped me clarify my understanding.

 Jan

On 10/26/22 21:10, Kenneth Knowles wrote:



On Tue, Oct 25, 2022 at 5:53 AM Jan Lukavský  wrote:

> Not quite IMO. It is a subtle difference. Perhaps these
transforms can be *implemented* using stateful DoFn, but
defining their semantics directly at a high level is more
powerful. The higher level we can make transforms, the more
flexibility we have in the runners. You *could* suggest that
we take the same approach as we do with Combine: not a
primitive, but a special transform that we optimize. You
could say that "vanilla ParDo" is a composite that has a
stateful ParDo implementation, but a runner can implement the
composite more efficiently (without a shuffle). Same with
CoGBK. You could say that there is a default expansion of
CoGBK that uses stateful DoFn (which implies a shuffle) but
that smart runners will not use that expansion.

Yes, semantics > optimizations. For optimizations Beam
already has a facility - PTransformOverride. There is no
fundamental difference about how we treat Combine wrt GBK. It
*can* be expanded using GBK, but "smart runners will not use
that expansion". This is essentially the root of this discussion.

If I rephrase it:

 a) why do we distinguish between "some" actually composite
transforms treating them as primitive, while others have
expansions, although the fundamental reasoning seems the same
for both (performance)?

It is identical to why you can choose different axioms for formal
logic and get all the same provable statements. You have to
choose something. But certainly a runner that just executes
primitives is the bare minimum and all runners are really
expected to take advantage of known composites. Before
portability, the benefit was minimal to have the runner (written
in Java) execute a transform directly vs calling a user DoFn. Now
with portability it could be huge if it avoids a Fn API crossing.

 b) is there a fundamental reason why we do not support
stateful DoFn for merging windows?

No reason. The original design was to force users to only use
"mergeable" state in a stateful DoFn for merging windows. That is
an annoying restriction that we don't really need. So I think the
best way is to have an OnMerge callback. The internal legacy Java
APIs for this are way too complex. But portability wire protocols
support it (I think?) and making a good user facing API for all
the SDKs shouldn't be too hard.

Kenn

I feel that these are related and have historical reasons,
but I'd like to know that for sure. :)

 Jan

On 10/24/22 19:59, Kenneth Knowles wrote:



    On Mon, Oct 24, 2022 at 5:51 AM Jan Lukavský
 wrote:

On 10/22/22 21:47, Reuven Lax via dev wrote:

I think we stated that CoGroupbyKey was also a
primitive, though in practice it's implemented in terms
of GroupByKey today.

On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles
 wrote:



        On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský
 wrote:

Hi,

I have some missing pieces in my understanding
of the set of Beam's primitive transforms,
which I'd like to fill. First a quick recap of
what I think is the current state. We have
(basically) the following primitive transforms:

 - DoFn (stateless, stateful, splittable)

 - Window

 - Impulse

 - GroupByKey

 - Combine


Not a primitive, just a well-defined transform that
runners can execute in special ways.


Yep, OK, agree. Performance is orthogonal to semantics.




 - Flatten (pCollections)


The rest, yes.

Inside runners, we most often transform GBK
into ReduceFn (ReduceFnRunner), which does the
 

Re: Questions on primitive transforms hierarchy

2022-11-14 Thread Jan Lukavský
I somehow missed these answers, Reuven and Kenn, thanks for the 
discussion, it helped me clarify my understanding.


 Jan

On 10/26/22 21:10, Kenneth Knowles wrote:



On Tue, Oct 25, 2022 at 5:53 AM Jan Lukavský  wrote:

> Not quite IMO. It is a subtle difference. Perhaps these
transforms can be *implemented* using stateful DoFn, but defining
their semantics directly at a high level is more powerful. The
higher level we can make transforms, the more flexibility we have
in the runners. You *could* suggest that we take the same approach
as we do with Combine: not a primitive, but a special transform
that we optimize. You could say that "vanilla ParDo" is a
composite that has a stateful ParDo implementation, but a runner
can implement the composite more efficiently (without a shuffle).
Same with CoGBK. You could say that there is a default expansion
of CoGBK that uses stateful DoFn (which implies a shuffle) but
that smart runners will not use that expansion.

Yes, semantics > optimizations. For optimizations Beam already has
a facility - PTransformOverride. There is no fundamental
difference about how we treat Combine wrt GBK. It *can* be
expanded using GBK, but "smart runners will not use that
expansion". This is essentially the root of this discussion.

If I rephrase it:

 a) why do we distinguish between "some" actually composite
transforms treating them as primitive, while others have
expansions, although the fundamental reasoning seems the same for
both (performance)?

It is identical to why you can choose different axioms for formal 
logic and get all the same provable statements. You have to choose 
something. But certainly a runner that just executes primitives is the 
bare minimum and all runners are really expected to take advantage of 
known composites. Before portability, the benefit was minimal to have 
the runner (written in Java) execute a transform directly vs calling a 
user DoFn. Now with portability it could be huge if it avoids a Fn API 
crossing.


 b) is there a fundamental reason why we do not support stateful
DoFn for merging windows?

No reason. The original design was to force users to only use 
"mergeable" state in a stateful DoFn for merging windows. That is an 
annoying restriction that we don't really need. So I think the best 
way is to have an OnMerge callback. The internal legacy Java APIs for 
this are way too complex. But portability wire protocols support it (I 
think?) and making a good user facing API for all the SDKs shouldn't 
be too hard.


Kenn

I feel that these are related and have historical reasons, but I'd
like to know that for sure. :)

 Jan

On 10/24/22 19:59, Kenneth Knowles wrote:



    On Mon, Oct 24, 2022 at 5:51 AM Jan Lukavský  wrote:

On 10/22/22 21:47, Reuven Lax via dev wrote:

I think we stated that CoGroupbyKey was also a primitive,
though in practice it's implemented in terms of GroupByKey
today.

On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles
 wrote:



    On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský
 wrote:

Hi,

I have some missing pieces in my understanding of
the set of Beam's primitive transforms, which I'd
like to fill. First a quick recap of what I think is
the current state. We have (basically) the following
primitive transforms:

 - DoFn (stateless, stateful, splittable)

 - Window

 - Impulse

 - GroupByKey

 - Combine


Not a primitive, just a well-defined transform that
runners can execute in special ways.


Yep, OK, agree. Performance is orthogonal to semantics.




 - Flatten (pCollections)


The rest, yes.

Inside runners, we most often transform GBK into
ReduceFn (ReduceFnRunner), which does the actual
logic for both GBK and stateful DoFn.


ReduceFnRunner is for windowing / triggers and has
special feature to use a CombineFn while doing it.
Nothing to do with stateful DoFn.


My bad, wrong wording. The point was that *all* of the
semantics of GBK and Combine can be defined in terms of
stateful DoFn. There are some changes needed to stateful DoFn
to support the Combine functionality. But as mentioned above
- optimization is orthogonal to semantics.


Not quite IMO. It is a subtle difference. Perhaps these
transforms can be *implemented* using stateful DoFn, but defining
their semantics directly at a high level is more powerful. The
higher level we can make transforms, the more flexibility we have
in the runners. You *could* sugge

Re: Questions on primitive transforms hierarchy

2022-10-25 Thread Jan Lukavský
> Not quite IMO. It is a subtle difference. Perhaps these transforms 
can be *implemented* using stateful DoFn, but defining their semantics 
directly at a high level is more powerful. The higher level we can make 
transforms, the more flexibility we have in the runners. You *could* 
suggest that we take the same approach as we do with Combine: not a 
primitive, but a special transform that we optimize. You could say that 
"vanilla ParDo" is a composite that has a stateful ParDo implementation, 
but a runner can implement the composite more efficiently (without a 
shuffle). Same with CoGBK. You could say that there is a default 
expansion of CoGBK that uses stateful DoFn (which implies a shuffle) but 
that smart runners will not use that expansion.


Yes, semantics > optimizations. For optimizations Beam already has a 
facility - PTransformOverride. There is no fundamental difference about 
how we treat Combine wrt GBK. It *can* be expanded using GBK, but "smart 
runners will not use that expansion". This is essentially the root of 
this discussion.


If I rephrase it:

 a) why do we distinguish between "some" actually composite transforms 
treating them as primitive, while others have expansions, although the 
fundamental reasoning seems the same for both (performance)?


 b) is there a fundamental reason why we do not support stateful DoFn 
for merging windows?


I feel that these are related and have historical reasons, but I'd like 
to know that for sure. :)


 Jan

On 10/24/22 19:59, Kenneth Knowles wrote:



On Mon, Oct 24, 2022 at 5:51 AM Jan Lukavský  wrote:

On 10/22/22 21:47, Reuven Lax via dev wrote:

I think we stated that CoGroupbyKey was also a primitive, though
in practice it's implemented in terms of GroupByKey today.

On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles 
wrote:



    On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský
 wrote:

Hi,

I have some missing pieces in my understanding of the set
of Beam's primitive transforms, which I'd like to fill.
First a quick recap of what I think is the current state.
We have (basically) the following primitive transforms:

 - DoFn (stateless, stateful, splittable)

 - Window

 - Impulse

 - GroupByKey

 - Combine


Not a primitive, just a well-defined transform that runners
can execute in special ways.


Yep, OK, agree. Performance is orthogonal to semantics.




 - Flatten (pCollections)


The rest, yes.

Inside runners, we most often transform GBK into ReduceFn
(ReduceFnRunner), which does the actual logic for both
GBK and stateful DoFn.


ReduceFnRunner is for windowing / triggers and has special
feature to use a CombineFn while doing it. Nothing to do with
stateful DoFn.


My bad, wrong wording. The point was that *all* of the semantics
of GBK and Combine can be defined in terms of stateful DoFn. There
are some changes needed to stateful DoFn to support the Combine
functionality. But as mentioned above - optimization is orthogonal
to semantics.


Not quite IMO. It is a subtle difference. Perhaps these transforms can 
be *implemented* using stateful DoFn, but defining their semantics 
directly at a high level is more powerful. The higher level we can 
make transforms, the more flexibility we have in the runners. You 
*could* suggest that we take the same approach as we do with Combine: 
not a primitive, but a special transform that we optimize. You could 
say that "vanilla ParDo" is a composite that has a stateful ParDo 
implementation, but a runner can implement the composite more 
efficiently (without a shuffle). Same with CoGBK. You could say that 
there is a default expansion of CoGBK that uses stateful DoFn (which 
implies a shuffle) but that smart runners will not use that expansion.



I'll compare this to the set of transforms we used to use
in Euphoria (currently java SDK extension):

 - FlatMap ~~ stateless DoFn

 - Union ~~ Flatten

 - ReduceStateByKey ~~ stateful DoFn, GBK, Combine, Window


Stateful DoFn does not require associative or commutative
operation, while reduce/combine does. Windowing is really
just a secondary key for GBK/Combine that allows completion
of unbounded aggregations but has no computation associated
with it.


Merging WindowFn contains some computation. The fact that stateful
DoFn do not require specific form of reduce function is precisely
what makes it the actual primitive, no?



 - (missing Impulse)


Then you must have some primitive sources with splitting?

 - (missing splittable DoFn)


Kind of the same question - SDF is the one and o

Re: Questions on primitive transforms hierarchy

2022-10-24 Thread Jan Lukavský

On 10/22/22 21:47, Reuven Lax via dev wrote:
I think we stated that CoGroupbyKey was also a primitive, though in 
practice it's implemented in terms of GroupByKey today.


On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles  wrote:



On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský  wrote:

Hi,

I have some missing pieces in my understanding of the set of
Beam's primitive transforms, which I'd like to fill. First a
quick recap of what I think is the current state. We have
(basically) the following primitive transforms:

 - DoFn (stateless, stateful, splittable)

 - Window

 - Impulse

 - GroupByKey

 - Combine


Not a primitive, just a well-defined transform that runners can
execute in special ways.


Yep, OK, agree. Performance is orthogonal to semantics.




 - Flatten (pCollections)


The rest, yes.

Inside runners, we most often transform GBK into ReduceFn
(ReduceFnRunner), which does the actual logic for both GBK and
stateful DoFn.


ReduceFnRunner is for windowing / triggers and has special feature
to use a CombineFn while doing it. Nothing to do with stateful DoFn.

My bad, wrong wording. The point was that *all* of the semantics of GBK 
and Combine can be defined in terms of stateful DoFn. There are some 
changes needed to stateful DoFn to support the Combine functionality. 
But as mentioned above - optimization is orthogonal to semantics.


I'll compare this to the set of transforms we used to use in
Euphoria (currently java SDK extension):

 - FlatMap ~~ stateless DoFn

 - Union ~~ Flatten

 - ReduceStateByKey ~~ stateful DoFn, GBK, Combine, Window


Stateful DoFn does not require associative or commutative
operation, while reduce/combine does. Windowing is really just a
secondary key for GBK/Combine that allows completion of unbounded
aggregations but has no computation associated with it.

Merging WindowFn contains some computation. The fact that stateful DoFn 
do not require specific form of reduce function is precisely what makes 
it the actual primitive, no?



 - (missing Impulse)


Then you must have some primitive sources with splitting?

 - (missing splittable DoFn)


Kind of the same question - SDF is the one and only primitive that
creates parallelism.

Original Euphoria had an analogy to (Un)boundedReader. The SDK extension 
in Beam works on top of PCollecions and therefore does not deal with IOs.



The ReduceStateByKey is a transform that is a "combinable
stateful DoFn" - i.e. the state might be created pre-shuffle,
on trigger the state is shuffled and then merged. In Beam we
already have CombiningState and MergingState facility (sort
of), which is what is needed, we just do not have the ability
to shuffle the partial states and then combine them. This also
relates to the inability to run stateful DoFn for merging
windowFns, because that is needed there as well. Is this
something that is fundamentally impossible to define for all
runners? What is worth noting is that building, shuffling and
merging the state before shuffle requires compatible trigger
(purely based on watermark), otherwise the transform
fall-backs to "classical DoFn".


Stateful DoFn for merging windows can be defined. You could
require all state to be mergeable and then it is automatic. Or you
could have an "onMerge" callback. These should both be fine. The
automatic version is less likely to have nonsensical semantics,
but allowing the callback to do "whatever it wants" whether the
result is good or not is more consistent with the design of
stateful DoFn.

Yes, but this is the same for CombineFn, right? The merge (or combine) 
has to be correctly aligned with the computation. The current situation 
is that we do not support stateful DoFns for merging WindowFn [1].



Whether and where a shuffle takes place may vary. Start with the
maths.

Shuffle happens at least whenever there is a need to regroup keys. I'm 
not sure which maths you refer to, can you clarify please?


 Jan

[1] 
https://github.com/apache/beam/blob/45b6ac71a87bb2ed83613c90d35ef2d0752266bf/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L106




Kenn

Bottom line: I'm thinking of proposing to drop Euphoria
extension, because it has essentially no users and actually no
maintainers, but I have a feeling there is a value in the set
of operators that could be transferred to Beam core, maybe.
I'm pretty sure it would bring value to users to have access
to a "combining stateful DoFn" primitive (even better would be
"combining splittable DoFn").

Looking forward to any comments on this.

 Jan



Questions on primitive transforms hierarchy

2022-10-21 Thread Jan Lukavský

Hi,

I have some missing pieces in my understanding of the set of Beam's 
primitive transforms, which I'd like to fill. First a quick recap of 
what I think is the current state. We have (basically) the following 
primitive transforms:


 - DoFn (stateless, stateful, splittable)

 - Window

 - Impulse

 - GroupByKey

 - Combine

 - Flatten (pCollections)


Inside runners, we most often transform GBK into ReduceFn 
(ReduceFnRunner), which does the actual logic for both GBK and stateful 
DoFn.


I'll compare this to the set of transforms we used to use in Euphoria 
(currently java SDK extension):


 - FlatMap ~~ stateless DoFn

 - Union ~~ Flatten

 - ReduceStateByKey ~~ stateful DoFn, GBK, Combine, Window

 - (missing Impulse)

 - (missing splittable DoFn)


The ReduceStateByKey is a transform that is a "combinable stateful DoFn" 
- i.e. the state might be created pre-shuffle, on trigger the state is 
shuffled and then merged. In Beam we already have CombiningState and 
MergingState facility (sort of), which is what is needed, we just do not 
have the ability to shuffle the partial states and then combine them. 
This also relates to the inability to run stateful DoFn for merging 
windowFns, because that is needed there as well. Is this something that 
is fundamentally impossible to define for all runners? What is worth 
noting is that building, shuffling and merging the state before shuffle 
requires compatible trigger (purely based on watermark), otherwise the 
transform fall-backs to "classical DoFn".


Bottom line: I'm thinking of proposing to drop Euphoria extension, 
because it has essentially no users and actually no maintainers, but I 
have a feeling there is a value in the set of operators that could be 
transferred to Beam core, maybe. I'm pretty sure it would bring value to 
users to have access to a "combining stateful DoFn" primitive (even 
better would be "combining splittable DoFn").


Looking forward to any comments on this.

 Jan



  1   2   3   4   5   6   >