Pull Request #12670 Review for BEAM-5757

2020-08-26 Thread JITHIN SUKUMAR
Hi beam developers,

I created a PR https://github.com/apache/beam/pull/12670 for resolving the
apache beam issue https://issues.apache.org/jira/browse/BEAM-5757.

I tagged the owners mentioned in OWNERS.md but didn't receive any review
yet. Can someone please help me by taking a look at the PR.

Thanks.
Regards,
Jithin


Re: Create External Transform with WindowFn

2020-08-26 Thread Robert Burke
Coders should only be checked over the language boundaries.

On Wed, Aug 26, 2020, 6:24 PM Boyuan Zhang  wrote:

> Thanks Cham!
>
>  I just realized that the *beam:window_fn:serialized_**java:v1 *is
> introduced by Java *Reshuffle.viaRandomKey()*. But
> *Reshuffle.viaRandomKey()* does rewindowed into original window
> strategy(which is *GlobalWindows *in my case). Is it expected that we
> also check intermediate PCollection rather than only the PCollection that
> across the language boundary?
>
> More about my Ptransform:
> MyExternalPTransform  -- expand to --  ParDo() -> Reshuffle.viaRandomKey()
> -> ParDo() -> WindowInto(FixWindow) -> ParDo() -> output void
>
>|
>
> -> ParDo() ->
> output PCollection to Python SDK
>
> On Tue, Aug 25, 2020 at 6:29 PM Chamikara Jayalath 
> wrote:
>
>> Also it's strange that Java used (beam:window_fn:serialized_java:v1) for
>> the URN here instead of "beam:window_fn:fixed_windows:v1" [1] which is
>> what is being registered by Python [2]. This seems to be the immediate
>> issue. Tracking bug for supporting custom windows is
>> https://issues.apache.org/jira/browse/BEAM-10507.
>>
>> [1]
>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/standard_window_fns.proto#L55
>> [2]
>> https://github.com/apache/beam/blob/bd4df94ae10a7e7b0763c1917746d2faf5aeed6c/sdks/python/apache_beam/transforms/window.py#L449
>>
>> On Tue, Aug 25, 2020 at 6:07 PM Chamikara Jayalath 
>> wrote:
>>
>>> Pipelines that use external WindowingStrategies might be failing during
>>> proto -> object -> proto conversion we do today. This limitation will go
>>> away once Dataflow directly starts reading Beam protos. We are working on
>>> this now.
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Tue, Aug 25, 2020 at 5:38 PM Boyuan Zhang  wrote:
>>>
 Thanks, Robert! I want to add more details on my External PTransform:

 MyExternalPTransform  -- expand to --  ParDo() -> WindowInto(FixWindow)
 -> ParDo() -> output void
 |
 ->
 ParDo() -> output PCollection to Python SDK
 The full stacktrace:

 INFO:root:Using Java SDK harness container image 
 dataflow-dev.gcr.io/boyuanz/java:latest
 Starting expansion service at localhost:53569
 Aug 13, 2020 7:42:11 PM 
 org.apache.beam.sdk.expansion.service.ExpansionService 
 loadRegisteredTransforms
 INFO: Registering external transforms: [beam:external:java:kafka:read:v1, 
 beam:external:java:kafka:write:v1, beam:external:java:jdbc:read_rows:v1, 
 beam:external:java:jdbc:write:v1, beam:external:java:generate_sequence:v1]
beam:external:java:kafka:read:v1: 
 org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x000800b2a440@4ac68d3e
beam:external:java:kafka:write:v1: 
 org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x000800b2a440@277c0f21
beam:external:java:jdbc:read_rows:v1: 
 org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x000800b2a440@6073f712
beam:external:java:jdbc:write:v1: 
 org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x000800b2a440@43556938
beam:external:java:generate_sequence:v1: 
 org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x000800b2a440@3d04a311
 WARNING:apache_beam.options.pipeline_options_validator:Option --zone is 
 deprecated. Please use --worker_zone instead.
 Aug 13, 2020 7:42:12 PM 
 org.apache.beam.sdk.expansion.service.ExpansionService expand
 INFO: Expanding 'WriteToKafka' with URN 'beam:external:java:kafka:write:v1'
 Aug 13, 2020 7:42:14 PM 
 org.apache.beam.sdk.expansion.service.ExpansionService expand
 INFO: Expanding 'ReadFromKafka' with URN 'beam:external:java:kafka:read:v1'

 WARNING:root:Make sure that locally built Python SDK docker image has 
 Python 3.6 interpreter.
 INFO:root:Using Python SDK docker image: 
 apache/beam_python3.6_sdk:2.24.0.dev. If the image is not available at 
 local, we will try to pull from hub.docker.com
 Traceback (most recent call last):
   File "", line 165, in run_filename_as_main
   File "", line 39, in _run_code_in_main
   File "apache_beam/integration/cross_language_kafkaio_test.py", line 87, 
 in 
 run()
   File "apache_beam/integration/cross_language_kafkaio_test.py", line 82, 
 in run
 test_method(beam.Pipeline(options=pipeline_options))
   File "apache_beam/io/external/xlang_kafkaio_it_test.py", line 94, in 
 run_xlang_kafkaio
 pipelin

Re: Create External Transform with WindowFn

2020-08-26 Thread Boyuan Zhang
Thanks Cham!

 I just realized that the *beam:window_fn:serialized_**java:v1 *is
introduced by Java *Reshuffle.viaRandomKey()*. But
*Reshuffle.viaRandomKey()* does rewindowed into original window
strategy(which is *GlobalWindows *in my case). Is it expected that we also
check intermediate PCollection rather than only the PCollection that across
the language boundary?

More about my Ptransform:
MyExternalPTransform  -- expand to --  ParDo() -> Reshuffle.viaRandomKey()
-> ParDo() -> WindowInto(FixWindow) -> ParDo() -> output void

 |

  -> ParDo() ->
output PCollection to Python SDK

On Tue, Aug 25, 2020 at 6:29 PM Chamikara Jayalath 
wrote:

> Also it's strange that Java used (beam:window_fn:serialized_java:v1) for
> the URN here instead of "beam:window_fn:fixed_windows:v1" [1] which is
> what is being registered by Python [2]. This seems to be the immediate
> issue. Tracking bug for supporting custom windows is
> https://issues.apache.org/jira/browse/BEAM-10507.
>
> [1]
> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/standard_window_fns.proto#L55
> [2]
> https://github.com/apache/beam/blob/bd4df94ae10a7e7b0763c1917746d2faf5aeed6c/sdks/python/apache_beam/transforms/window.py#L449
>
> On Tue, Aug 25, 2020 at 6:07 PM Chamikara Jayalath 
> wrote:
>
>> Pipelines that use external WindowingStrategies might be failing during
>> proto -> object -> proto conversion we do today. This limitation will go
>> away once Dataflow directly starts reading Beam protos. We are working on
>> this now.
>>
>> Thanks,
>> Cham
>>
>> On Tue, Aug 25, 2020 at 5:38 PM Boyuan Zhang  wrote:
>>
>>> Thanks, Robert! I want to add more details on my External PTransform:
>>>
>>> MyExternalPTransform  -- expand to --  ParDo() -> WindowInto(FixWindow)
>>> -> ParDo() -> output void
>>> |
>>> ->
>>> ParDo() -> output PCollection to Python SDK
>>> The full stacktrace:
>>>
>>> INFO:root:Using Java SDK harness container image 
>>> dataflow-dev.gcr.io/boyuanz/java:latest
>>> Starting expansion service at localhost:53569
>>> Aug 13, 2020 7:42:11 PM 
>>> org.apache.beam.sdk.expansion.service.ExpansionService 
>>> loadRegisteredTransforms
>>> INFO: Registering external transforms: [beam:external:java:kafka:read:v1, 
>>> beam:external:java:kafka:write:v1, beam:external:java:jdbc:read_rows:v1, 
>>> beam:external:java:jdbc:write:v1, beam:external:java:generate_sequence:v1]
>>> beam:external:java:kafka:read:v1: 
>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x000800b2a440@4ac68d3e
>>> beam:external:java:kafka:write:v1: 
>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x000800b2a440@277c0f21
>>> beam:external:java:jdbc:read_rows:v1: 
>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x000800b2a440@6073f712
>>> beam:external:java:jdbc:write:v1: 
>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x000800b2a440@43556938
>>> beam:external:java:generate_sequence:v1: 
>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x000800b2a440@3d04a311
>>> WARNING:apache_beam.options.pipeline_options_validator:Option --zone is 
>>> deprecated. Please use --worker_zone instead.
>>> Aug 13, 2020 7:42:12 PM 
>>> org.apache.beam.sdk.expansion.service.ExpansionService expand
>>> INFO: Expanding 'WriteToKafka' with URN 'beam:external:java:kafka:write:v1'
>>> Aug 13, 2020 7:42:14 PM 
>>> org.apache.beam.sdk.expansion.service.ExpansionService expand
>>> INFO: Expanding 'ReadFromKafka' with URN 'beam:external:java:kafka:read:v1'
>>>
>>> WARNING:root:Make sure that locally built Python SDK docker image has 
>>> Python 3.6 interpreter.
>>> INFO:root:Using Python SDK docker image: 
>>> apache/beam_python3.6_sdk:2.24.0.dev. If the image is not available at 
>>> local, we will try to pull from hub.docker.com
>>> Traceback (most recent call last):
>>>   File "", line 165, in run_filename_as_main
>>>   File "", line 39, in _run_code_in_main
>>>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 87, 
>>> in 
>>> run()
>>>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 82, 
>>> in run
>>> test_method(beam.Pipeline(options=pipeline_options))
>>>   File "apache_beam/io/external/xlang_kafkaio_it_test.py", line 94, in 
>>> run_xlang_kafkaio
>>> pipeline.run(False)
>>>   File "apache_beam/pipeline.py", line 534, in run
>>> return self.runner.run_pipeline(self, self._options)
>>>   File "apache_beam/runners/dataflow/dataflow_runner.py", line 496, in 
>>> run_pipeline
>>> 

Re: JIRA - can't set resolution?

2020-08-26 Thread Kenneth Knowles
In the ticket, I asked to just make it editable. I'm not a fan of all these
"edit only via transition" things. It just ties hands unhelpfully. But the
problem is that I think the ticket itself needs escalation so my request
there is likely unnoticed.

Kenn

On Wed, Aug 26, 2020 at 1:21 PM Udi Meiri  wrote:

> Thanks, I wonder if there's a workaround in the meantime to manually set
> resolution.
>
> On Wed, Aug 26, 2020 at 10:09 AM Kenneth Knowles  wrote:
>
>> I reviewed the guidance and raised the priority. I will follow up more at
>> least to get an acknowledgment.
>>
>> Kenn
>>
>> On Tue, Aug 25, 2020 at 2:37 PM Brian Hulette 
>> wrote:
>>
>>> Yeah this is still broken as described in
>>> https://lists.apache.org/thread.html/r68924a0317a75d7858914b914e1d95fe36e0a9bf1794ef6861df7118%40%3Cdev.beam.apache.org%3E
>>>
>>> Currently blocked on https://issues.apache.org/jira/browse/INFRA-20563
>>>
>>> On Tue, Aug 25, 2020 at 10:34 AM Udi Meiri  wrote:
>>>
 Example: https://issues.apache.org/jira/browse/BEAM-10751
 When I click "resolve issue" the status changes to "resolved" but
 resolution is still "unresolved" and I can't change it.

 I believe there was a change a while back to JIRA?

>>>


Re: JIRA - can't set resolution?

2020-08-26 Thread Udi Meiri
Thanks, I wonder if there's a workaround in the meantime to manually set
resolution.

On Wed, Aug 26, 2020 at 10:09 AM Kenneth Knowles  wrote:

> I reviewed the guidance and raised the priority. I will follow up more at
> least to get an acknowledgment.
>
> Kenn
>
> On Tue, Aug 25, 2020 at 2:37 PM Brian Hulette  wrote:
>
>> Yeah this is still broken as described in
>> https://lists.apache.org/thread.html/r68924a0317a75d7858914b914e1d95fe36e0a9bf1794ef6861df7118%40%3Cdev.beam.apache.org%3E
>>
>> Currently blocked on https://issues.apache.org/jira/browse/INFRA-20563
>>
>> On Tue, Aug 25, 2020 at 10:34 AM Udi Meiri  wrote:
>>
>>> Example: https://issues.apache.org/jira/browse/BEAM-10751
>>> When I click "resolve issue" the status changes to "resolved" but
>>> resolution is still "unresolved" and I can't change it.
>>>
>>> I believe there was a change a while back to JIRA?
>>>
>>


smime.p7s
Description: S/MIME Cryptographic Signature


Re: [DISCUSS] Better alignment of Apache Flink and Apache Beam releases

2020-08-26 Thread Kenneth Knowles
Agree with Luke's concerns about branches. We already have separate
subdirectories for Flink versions all on the master branch.

If we replace "branch" with "subdirectory of
https://github.com/apache/beam/tree/master/runners/flink"; then we could
have a 1.12-SNAPSHOT variant of the runner which we do not release. This
would automatically catch new incompatibilities. Does Flink publish
snapshots regularly?

We can also create the 1.11 subdirectory at any time and only release it
when ready, no?

At some point our testing matrix will get infeasibly large, but we can
mitigate - for example by running more stable tests less frequently.

Kenn

On Mon, Aug 10, 2020 at 7:33 PM Luke Cwik  wrote:

> Is there a way we could use a fixed point in time Flink nightly that
> passes all the tests/validation and bump up the nightly version manually to
> get "closer" to the release candidate instead of doing another branch?
>
> This would mean that any changes that impact the Flink runner that are
> related to project clean-up or that are cross-cutting would be responsible
> to fix the nightly version as well. It might also lead to fewer
> integrations and merge conflicts when attempting to merge said branch back
> into master.
>
>
> On Mon, Aug 10, 2020 at 3:35 AM Jan Lukavský  wrote:
>
>> Hi,
>>
>> I "sense a disturbance in the force" relating to the way we release Beam
>> with supported Flink versions. The current released version of Apache
>> Flink is 1.11.1, while we still support (at least up to Beam 2.24.0)
>> only version 1.10.1. There is tracking issue for 1.11. support [1], but
>> even if someone starts to work on this soon, it will probably not make
>> it to sooner release than 2.26.0, surely not before 2.25.0). I think
>> that the features included in newest Flink releases are pretty much
>> needed by our users, so I'd like to revive a somewhat left-over
>> discussion started in [2]. I think that we could be more aligned with
>> Flink's release is we created the following workflow:
>>
>>   - when a new Flink version is released, create a new branch for
>> flink-runner-
>>
>>   - this new branch would depend on publihed SNAPSHOT version of the
>> not-yet-released version of Flink
>>
>>   - we would need a jenkins job that would periodically do builds
>> against new SNAPSHOTs and notify (some, volunteers welcome :))
>> committers about the status of the build
>>
>>   - this way, we might have people aware of incompatibilities, and
>> (pretty much) increase the chance, that the new runner branch would be
>> in shape to be able to switch from SNAPSHOT to release as soon as the
>> version of Beam gets released, merging the released version would mean
>> we create another branch for the new SNAPSHOT of Flink and repeat the
>> process
>>
>> This workflow would rely on volunteer commiters (I'm one) that would be
>> willing to be notified about the failures and possibly fix them.
>>
>> Looking forward for opinions, or alternative proposals to tackle this.
>>
>>   Jan
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-10612
>>
>> [2]
>>
>> https://lists.apache.org/thread.html/rfb5ac9d889d0e3f4400471de3c25000a15352bde879622c899d97581%40%3Cdev.beam.apache.org%3E
>>
>>


Re: @StateId uniqueness across DoFn(s)

2020-08-26 Thread Kenneth Knowles
On Tue, Aug 25, 2020 at 8:20 PM Ke Wu  wrote:

> Thank you all for the reply. One last question, I noticed that 
> ParDoTest$StateTests
> > testValueStateSameId
> 
>  seems
> to be testing exact this case, however, the first ParDo intentionally
> changed the key of inputs thus the subsequent ParDo would never share the
> same state cell anyway. Is this expected or do we want to actually want to
> test that same state id in different DoFn(s) is actually completely
> separate?
>

Good catch. You are right that the test is wrong. The two DoFns should have
the same keys, windows, and stateids. Would you care to fix it?

Kenn


>
> On Aug 21, 2020, at 4:50 PM, Robert Bradshaw  wrote:
>
> We should be using PTransform Labels (aka Names), not ids, for naming
> state. This is why the names must line up when doing, for example, a
> Dataflow update operation with Stateful DoFns.
>
> (And, yes, if the user does not specify the transform name, and it is
> autogenerated differently, this will be an error. This is why we throw
> exceptions in the SDK if a name is re-used rather than just appending
> a counter or similar.)
>
>
> On Fri, Aug 21, 2020 at 4:12 PM Ke Wu  wrote:
>
>
> If user does not explicitly specify transform name, in which case a
> autogenerated name will be used when generating the unique id, does it
> mean, the id could change when the pipeline changes, such as adding extra
> transforms etc?
>
> On Aug 21, 2020, at 11:43 AM, Luke Cwik  wrote:
>
> The DoFn is associated with a PTransform and in the pipeline proto there
> is a unique id associated with each PTransform. You can use that to
> generate a composite key (ptransformid, stateid) which will be unique
> within the pipeline.
>
> On Fri, Aug 21, 2020 at 11:26 AM Ke Wu  wrote:
>
>
> Thank you Reuven for the confirmation. Do you know what is the recommended
> way for underlying runners to distinguish same state id in different
> DoFn(s)?
>
> On Aug 21, 2020, at 10:27 AM, Reuven Lax  wrote:
>
> StateId is scoped to the DoFn. You can use the same string in different
> DoFns for completely different states.
>
> On Fri, Aug 21, 2020 at 10:21 AM Ke Wu  wrote:
>
>
> Hello everyone,
>
> After reading through Stateful processing with Apache Beam and
> DoFn.StateId, I understand that each state id must be unique and must be
> the same type at least in the same DoFn, however, it does not explicitly
> mention whether or not it is expected and supported that the same state id
> to be declared in different DoFn(s). If Yes, is the state supposed to be a
> shared state or is supposed to completed separate, therefore it could even
> be different types. If No, it seems that the validation in Beam SDK only
> validates uniqueness in the same DoFn.
>
> Thanks,
> Ke
>
>
>
>
>
>


Re: JIRA - can't set resolution?

2020-08-26 Thread Kenneth Knowles
I reviewed the guidance and raised the priority. I will follow up more at
least to get an acknowledgment.

Kenn

On Tue, Aug 25, 2020 at 2:37 PM Brian Hulette  wrote:

> Yeah this is still broken as described in
> https://lists.apache.org/thread.html/r68924a0317a75d7858914b914e1d95fe36e0a9bf1794ef6861df7118%40%3Cdev.beam.apache.org%3E
>
> Currently blocked on https://issues.apache.org/jira/browse/INFRA-20563
>
> On Tue, Aug 25, 2020 at 10:34 AM Udi Meiri  wrote:
>
>> Example: https://issues.apache.org/jira/browse/BEAM-10751
>> When I click "resolve issue" the status changes to "resolved" but
>> resolution is still "unresolved" and I can't change it.
>>
>> I believe there was a change a while back to JIRA?
>>
>


Re: How does groupIntoBatches behave when there are too few elements for a key?

2020-08-26 Thread Reuven Lax
Yes.- you can just add a new timer spec to the DoFn.

On Wed, Aug 26, 2020 at 9:26 AM Siyuan Chen  wrote:

> I have been preparing a PR to add the timeout option. I had a dumb
> question - seems to me that the timeout should be set in processing time
> while the existing timer fired at the window expiration is in event time.
> Is there a way to have timers in different time domains?
> --
> Best regards,
> Siyuan
>
>
> On Wed, Aug 26, 2020 at 9:15 AM Reuven Lax  wrote:
>
>> Seems reasonable to add an optional timeout to GroupIntoBatches to flush
>> records.
>>
>> On Wed, Aug 26, 2020 at 9:04 AM Robert Bradshaw 
>> wrote:
>>
>>> GroupIntoBatches sets a timer to flush the batches at the end of the
>>> window [1] no matter how many elements there are. This could cause a
>>> problem for the GlobalWindow if no more data ever comes in.
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/release-2.23.0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L116
>>>
>>> On Wed, Aug 26, 2020 at 8:55 AM Alex Amato  wrote:
>>> >
>>> > How does groupIntoBatches behave when there are too few elements for a
>>> key (less than the provided batch size)?
>>> >
>>> > Based on how its described. Its not clear to me that the elements will
>>> ever emit. Can this cause stuckness in this case?
>>>
>>


Re: How does groupIntoBatches behave when there are too few elements for a key?

2020-08-26 Thread Siyuan Chen
I have been preparing a PR to add the timeout option. I had a dumb question
- seems to me that the timeout should be set in processing time while the
existing timer fired at the window expiration is in event time. Is there a
way to have timers in different time domains?
--
Best regards,
Siyuan


On Wed, Aug 26, 2020 at 9:15 AM Reuven Lax  wrote:

> Seems reasonable to add an optional timeout to GroupIntoBatches to flush
> records.
>
> On Wed, Aug 26, 2020 at 9:04 AM Robert Bradshaw 
> wrote:
>
>> GroupIntoBatches sets a timer to flush the batches at the end of the
>> window [1] no matter how many elements there are. This could cause a
>> problem for the GlobalWindow if no more data ever comes in.
>>
>> [1]
>> https://github.com/apache/beam/blob/release-2.23.0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L116
>>
>> On Wed, Aug 26, 2020 at 8:55 AM Alex Amato  wrote:
>> >
>> > How does groupIntoBatches behave when there are too few elements for a
>> key (less than the provided batch size)?
>> >
>> > Based on how its described. Its not clear to me that the elements will
>> ever emit. Can this cause stuckness in this case?
>>
>


Re: How does groupIntoBatches behave when there are too few elements for a key?

2020-08-26 Thread Reuven Lax
Seems reasonable to add an optional timeout to GroupIntoBatches to flush
records.

On Wed, Aug 26, 2020 at 9:04 AM Robert Bradshaw  wrote:

> GroupIntoBatches sets a timer to flush the batches at the end of the
> window [1] no matter how many elements there are. This could cause a
> problem for the GlobalWindow if no more data ever comes in.
>
> [1]
> https://github.com/apache/beam/blob/release-2.23.0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L116
>
> On Wed, Aug 26, 2020 at 8:55 AM Alex Amato  wrote:
> >
> > How does groupIntoBatches behave when there are too few elements for a
> key (less than the provided batch size)?
> >
> > Based on how its described. Its not clear to me that the elements will
> ever emit. Can this cause stuckness in this case?
>


Re: How does groupIntoBatches behave when there are too few elements for a key?

2020-08-26 Thread Robert Bradshaw
GroupIntoBatches sets a timer to flush the batches at the end of the
window [1] no matter how many elements there are. This could cause a
problem for the GlobalWindow if no more data ever comes in.

[1] 
https://github.com/apache/beam/blob/release-2.23.0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L116

On Wed, Aug 26, 2020 at 8:55 AM Alex Amato  wrote:
>
> How does groupIntoBatches behave when there are too few elements for a key 
> (less than the provided batch size)?
>
> Based on how its described. Its not clear to me that the elements will ever 
> emit. Can this cause stuckness in this case?


Re: How does groupIntoBatches behave when there are too few elements for a key?

2020-08-26 Thread Luke Cwik
GroupIntoBatches should always emit any buffered elements on window
expiration.

On Wed, Aug 26, 2020 at 8:55 AM Alex Amato  wrote:

> How does groupIntoBatches behave when there are too few elements for a key
> (less than the provided batch size)?
>
> Based on how its described
> .
> Its not clear to me that the elements will ever emit. Can this cause
> stuckness in this case?
>


How does groupIntoBatches behave when there are too few elements for a key?

2020-08-26 Thread Alex Amato
How does groupIntoBatches behave when there are too few elements for a key
(less than the provided batch size)?

Based on how its described
.
Its not clear to me that the elements will ever emit. Can this cause
stuckness in this case?


Re: [External] Re: Memory Issue When Running Beam On Flink

2020-08-26 Thread Jan Lukavský
Window triggering is afaik operation that is specific to GBK. Stateful 
DoFns can have (as shown in the case of deduplication) timers set for 
the GC only, triggering has no effect there. And yes, if we have other 
timers than GC (any user timers), then we have to have GC timer (because 
timers are a form of state).


Imagine a (admittedly artificial) example of deduplication in fixed 
window of 10 years. It would exhibit exactly the same state growth as 
global window (and 10 years is "almost infinite", right? :)).


Jan

On 8/26/20 10:01 AM, Maximilian Michels wrote:
The inefficiency described happens if and only if the following two 
conditions are met:


 a) there are many timers per single window (as otherwise they will 
be negligible)


 b) there are many keys which actually contain no state (as otherwise 
the timer would be negligible wrt the state size) 


Each window has to have a timer set, it is unavoidable for the window 
computation to be triggered accordingly. This happens regardless of 
whether we have state associated with the key/window or not. The 
additional cleanup timer is just a side effect and not a concern in my 
opinion. Since window computation is per-key, there is no way around 
this. I don't think skipping the cleanup timer for non global windows 
without state is a good idea, just to save one cleanup timer, when 
there are already timers created for the window computation.


Now, the global window is different in that respect because we can't 
assume it is going to be triggered for unbounded streams. Thus, it 
makes sense to me to handle it differently by not using triggers but 
cleaning up once a watermark > MAX_TIMESTAMP has been processed.


-Max

On 26.08.20 09:20, Jan Lukavský wrote:

On 8/25/20 9:27 PM, Maximilian Michels wrote:

I agree that this probably solves the described issue in the most 
straightforward way, but special handling for global window feels 
weird, as there is really nothing special about global window wrt 
state cleanup. 


Why is special handling for the global window weird? After all, it 
is a special case because the global window normally will only be 
cleaned up when the application terminates.


The inefficiency described happens if and only if the following two 
conditions are met:


  a) there are many timers per single window (as otherwise they will 
be negligible)


  b) there are many keys which actually contain no state (as 
otherwise the timer would be negligible wrt the state size)


It only happens to be the case that global window is the (by far, 
might be 98% cases) most common case that satisfies these two 
conditions, but there are other cases as well (e.g. long lasting 
fixed window). Discussed options 2) and 3) are systematic in the 
sense that option 2) cancels property a) and option 3) property b). 
Making use of correlation of global window with these two conditions 
to solve the issue is of course possible, but a little unsystematic 
and that's what feels 'weird'. :)




It doesn't change anything wrt migration. The timers that were 
already set remain and keep on contributing to the state size.


That's ok, regular timers for non-global windows need to remain set 
and should be persisted. They will be redistributed when scaling up 
and down.


I'm not sure that's a "problem", rather an inefficiency. But we 
could address it by deleting the timers where they are currently 
set, as mentioned previously.


I had imagined that we don't even set these timers for the global 
window. Thus, there is no need to clean them up.


-Max

On 25.08.20 09:43, Jan Lukavský wrote:
I agree that this probably solves the described issue in the most 
straightforward way, but special handling for global window feels 
weird, as there is really nothing special about global window wrt 
state cleanup. A solution that handles all windows equally would be 
semantically 'cleaner'. If I try to sum up:


  - option 3) seems best, provided that isEmpty() lookup is cheap 
for every state backend (e.g. that we do not hit disk multiple 
times), this option is the best for state size wrt timers in all 
windows


  - option 2) works well for key-aligned windows, also reduces 
state size in all windows


  - option "watermark timer" - solves issue, easily implemented, 
but doesn't improve situation for non-global windows


My conclusion would be - use watermark timer as hotfix, if we can 
prove that isEmpty() would be cheap, then use option 3) as final 
solution, otherwise use 2).


WDYT?

On 8/25/20 5:48 AM, Thomas Weise wrote:



On Mon, Aug 24, 2020 at 1:50 PM Maximilian Michels > wrote:


    I'd suggest a modified option (2) which does not use a timer to
    perform
    the cleanup (as mentioned, this will cause problems with 
migrating

    state).


That's a great idea. It's essentially a mix of 1) and 2) for the 
global window only.


It doesn't change anything wrt migration. The timers that 
were already set remain and keep on cont

Re: [External] Re: Memory Issue When Running Beam On Flink

2020-08-26 Thread Maximilian Michels

The inefficiency described happens if and only if the following two conditions 
are met:

 a) there are many timers per single window (as otherwise they will be 
negligible)

 b) there are many keys which actually contain no state (as otherwise the timer would be negligible wrt the state size) 


Each window has to have a timer set, it is unavoidable for the window 
computation to be triggered accordingly. This happens regardless of 
whether we have state associated with the key/window or not. The 
additional cleanup timer is just a side effect and not a concern in my 
opinion. Since window computation is per-key, there is no way around 
this. I don't think skipping the cleanup timer for non global windows 
without state is a good idea, just to save one cleanup timer, when there 
are already timers created for the window computation.


Now, the global window is different in that respect because we can't 
assume it is going to be triggered for unbounded streams. Thus, it makes 
sense to me to handle it differently by not using triggers but cleaning 
up once a watermark > MAX_TIMESTAMP has been processed.


-Max

On 26.08.20 09:20, Jan Lukavský wrote:

On 8/25/20 9:27 PM, Maximilian Michels wrote:

I agree that this probably solves the described issue in the most 
straightforward way, but special handling for global window feels 
weird, as there is really nothing special about global window wrt 
state cleanup. 


Why is special handling for the global window weird? After all, it is 
a special case because the global window normally will only be cleaned 
up when the application terminates.


The inefficiency described happens if and only if the following two 
conditions are met:


  a) there are many timers per single window (as otherwise they will be 
negligible)


  b) there are many keys which actually contain no state (as otherwise 
the timer would be negligible wrt the state size)


It only happens to be the case that global window is the (by far, might 
be 98% cases) most common case that satisfies these two conditions, but 
there are other cases as well (e.g. long lasting fixed window). 
Discussed options 2) and 3) are systematic in the sense that option 2) 
cancels property a) and option 3) property b). Making use of correlation 
of global window with these two conditions to solve the issue is of 
course possible, but a little unsystematic and that's what feels 
'weird'. :)




It doesn't change anything wrt migration. The timers that were 
already set remain and keep on contributing to the state size.


That's ok, regular timers for non-global windows need to remain set 
and should be persisted. They will be redistributed when scaling up 
and down.


I'm not sure that's a "problem", rather an inefficiency. But we could 
address it by deleting the timers where they are currently set, as 
mentioned previously.


I had imagined that we don't even set these timers for the global 
window. Thus, there is no need to clean them up.


-Max

On 25.08.20 09:43, Jan Lukavský wrote:
I agree that this probably solves the described issue in the most 
straightforward way, but special handling for global window feels 
weird, as there is really nothing special about global window wrt 
state cleanup. A solution that handles all windows equally would be 
semantically 'cleaner'. If I try to sum up:


  - option 3) seems best, provided that isEmpty() lookup is cheap for 
every state backend (e.g. that we do not hit disk multiple times), 
this option is the best for state size wrt timers in all windows


  - option 2) works well for key-aligned windows, also reduces state 
size in all windows


  - option "watermark timer" - solves issue, easily implemented, but 
doesn't improve situation for non-global windows


My conclusion would be - use watermark timer as hotfix, if we can 
prove that isEmpty() would be cheap, then use option 3) as final 
solution, otherwise use 2).


WDYT?

On 8/25/20 5:48 AM, Thomas Weise wrote:



On Mon, Aug 24, 2020 at 1:50 PM Maximilian Michels > wrote:


    I'd suggest a modified option (2) which does not use a timer to
    perform
    the cleanup (as mentioned, this will cause problems with migrating
    state).


That's a great idea. It's essentially a mix of 1) and 2) for the 
global window only.


It doesn't change anything wrt migration. The timers that 
were already set remain and keep on contributing to the state size.


I'm not sure that's a "problem", rather an inefficiency. But we 
could address it by deleting the timers where they are currently 
set, as mentioned previously.



    Instead, whenever we receive a watermark which closes the global
    window,
    we enumerate all keys and cleanup the associated state.

    This is the cleanest and simplest option.

    -Max

    On 24.08.20 20:47, Thomas Weise wrote:
    >
    > On Mon, Aug 24, 2020 at 11:35 AM Jan Lukavský mailto:je...@seznam.cz>
    > >> wrote:

Re: [External] Re: Memory Issue When Running Beam On Flink

2020-08-26 Thread Jan Lukavský

On 8/25/20 9:27 PM, Maximilian Michels wrote:

I agree that this probably solves the described issue in the most 
straightforward way, but special handling for global window feels 
weird, as there is really nothing special about global window wrt 
state cleanup. 


Why is special handling for the global window weird? After all, it is 
a special case because the global window normally will only be cleaned 
up when the application terminates.


The inefficiency described happens if and only if the following two 
conditions are met:


 a) there are many timers per single window (as otherwise they will be 
negligible)


 b) there are many keys which actually contain no state (as otherwise 
the timer would be negligible wrt the state size)


It only happens to be the case that global window is the (by far, might 
be 98% cases) most common case that satisfies these two conditions, but 
there are other cases as well (e.g. long lasting fixed window). 
Discussed options 2) and 3) are systematic in the sense that option 2) 
cancels property a) and option 3) property b). Making use of correlation 
of global window with these two conditions to solve the issue is of 
course possible, but a little unsystematic and that's what feels 'weird'. :)




It doesn't change anything wrt migration. The timers that were 
already set remain and keep on contributing to the state size.


That's ok, regular timers for non-global windows need to remain set 
and should be persisted. They will be redistributed when scaling up 
and down.


I'm not sure that's a "problem", rather an inefficiency. But we could 
address it by deleting the timers where they are currently set, as 
mentioned previously.


I had imagined that we don't even set these timers for the global 
window. Thus, there is no need to clean them up.


-Max

On 25.08.20 09:43, Jan Lukavský wrote:
I agree that this probably solves the described issue in the most 
straightforward way, but special handling for global window feels 
weird, as there is really nothing special about global window wrt 
state cleanup. A solution that handles all windows equally would be 
semantically 'cleaner'. If I try to sum up:


  - option 3) seems best, provided that isEmpty() lookup is cheap for 
every state backend (e.g. that we do not hit disk multiple times), 
this option is the best for state size wrt timers in all windows


  - option 2) works well for key-aligned windows, also reduces state 
size in all windows


  - option "watermark timer" - solves issue, easily implemented, but 
doesn't improve situation for non-global windows


My conclusion would be - use watermark timer as hotfix, if we can 
prove that isEmpty() would be cheap, then use option 3) as final 
solution, otherwise use 2).


WDYT?

On 8/25/20 5:48 AM, Thomas Weise wrote:



On Mon, Aug 24, 2020 at 1:50 PM Maximilian Michels > wrote:


    I'd suggest a modified option (2) which does not use a timer to
    perform
    the cleanup (as mentioned, this will cause problems with migrating
    state).


That's a great idea. It's essentially a mix of 1) and 2) for the 
global window only.


It doesn't change anything wrt migration. The timers that 
were already set remain and keep on contributing to the state size.


I'm not sure that's a "problem", rather an inefficiency. But we 
could address it by deleting the timers where they are currently 
set, as mentioned previously.



    Instead, whenever we receive a watermark which closes the global
    window,
    we enumerate all keys and cleanup the associated state.

    This is the cleanest and simplest option.

    -Max

    On 24.08.20 20:47, Thomas Weise wrote:
    >
    > On Mon, Aug 24, 2020 at 11:35 AM Jan Lukavský mailto:je...@seznam.cz>
    > >> wrote:
    >
    >      > The most general solution would be 3), given it can be
    agnostic
    >     to window types and does not assume extra runner 
capabilities.

    >
    >     Agree, 2) is optimization to that. It might be questionable
    if this
    >     is premature optimization, but generally querying multiple
    states
    >     for each clear opeartion to any state might be prohibitive,
    mostly
    >     when the state would be stored in external database (in 
case of

    >     Flink that would be RocksDB).
    >
    > For the use case I'm looking at, we are using the heap state
    backend. I
    > have not checked the RocksDB, but would assume that incremental
    cost of
    > isEmpty() for other states under the same key is negligible?
    >
    >      > 3) wouldn't require any state migration.
    >
    >     Actually, it would, as we would (ideally) like to migrate 
users'

    >     pipelines that already contain timers for the end of global
    window,
    >     which might not expire ever.
    >
    > Good catch. This could potentially be addressed by upgrading the
    timer
    > in the per record path.
    >
    >     On 8/24/20