Re: Output timestamp for Python event timers

2020-08-11 Thread Luke Cwik
+1 on what Boyuan said. It is important that the defaults for processing
time domain differ from the defaults for the event time domain.

On Tue, Aug 11, 2020 at 12:36 PM Yichi Zhang  wrote:

> +1 to expose set_output_timestamp and enrich python set timer api.
>
> On Tue, Aug 11, 2020 at 12:01 PM Boyuan Zhang  wrote:
>
>> Hi Maximilian,
>>
>> It makes sense to set  hold_timestamp as fire_timestamp when the
>> fire_timestamp is in the event time domain. Otherwise, the system may
>> advance the watermark incorrectly.
>> I think we can do something similar to Java FnApiRunner[1]:
>>
>>- Expose set_output_timestamp API to python timer as well
>>- If set_output_timestamp is not specified and timer is in event
>>domain, we can use fire_timestamp as hold_timestamp
>>- Otherwise, use input_timestamp as hold_timestamp.
>>
>> What do you think?
>>
>> [1]
>> https://github.com/apache/beam/blob/edb42952f6b0aa99477f5c7baca6d6a0d93deb4f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L1433-L1493
>>
>>
>>
>>
>> On Tue, Aug 11, 2020 at 9:00 AM Maximilian Michels 
>> wrote:
>>
>>> We ran into problems setting event time timers per-element in the Python
>>> SDK. Pipeline progress would stall.
>>>
>>> Turns out, although the Python SDK does not expose the timer output
>>> timestamp feature to the user, it sets the timer output timestamp to the
>>> current input timestamp of an element.
>>>
>>> This will lead to holding back the watermark until the timer fires (the
>>> Flink Runner respects the timer output timestamp when advancing the
>>> output watermark). We had set the fire timestamp to a timestamp so far
>>> in the future, that pipeline progress would completely stall for
>>> downstream transforms, due to the held back watermark.
>>>
>>> Considering that this feature is not even exposed to the user in the
>>> Python SDK, I think we should set the default output timestamp to the
>>> fire timestamp, and not to the input timestamp. This is also how timer
>>> work in the Java SDK.
>>>
>>> Let me know what you think.
>>>
>>> -Max
>>>
>>> PR: https://github.com/apache/beam/pull/12531
>>>
>>


Re: [PROPOSAL] Preparing for Beam 2.24.0 release

2020-08-11 Thread Daniel Oliveira
I'd like to send out a last minute reminder to fill out CHANGES.md
 with any major
changes that are going to be in 2.24.0. If you need a quick review for
that, just add me as a reviewer to your PR (GitHub username is "youngoli").
I'll keep an eye out for those until around 5 PM.

On another note, I need some help with setup from the release guide

:
1. I need someone to add me as a maintainer of the apache-beam package on
PyPI. Username: danoliveira
2. Someone might need to create a new version in JIRA
.
I'm not sure about this one because 2.25.0 already exists, I don't know if
2.26.0 needs to be created or if that's for the next release.

On Mon, Aug 10, 2020 at 8:27 PM Daniel Oliveira 
wrote:

> Hi everyone,
>
> It seems like there's no objections, so I'm preparing to cut the release
> on Wednesday.
>
> As a reminder, if you have any release-blocking issues, please have a JIRA
> and set "Fix version" to 2.24.0. For non-blocking issues, please set "Fix
> version" only once the issue is actually resolved, otherwise it makes it
> more difficult to differentiate release-blocking issues from non-blocking.
>
> Thanks,
> Daniel Oliveira
>
> On Thu, Aug 6, 2020 at 4:53 PM Rui Wang  wrote:
>
>> Awesome!
>>
>>
>> -Rui
>>
>> On Thu, Aug 6, 2020 at 4:14 PM Ahmet Altay  wrote:
>>
>>> +1 - Thank you Daniel!!
>>>
>>> On Wed, Jul 29, 2020 at 4:30 PM Daniel Oliveira 
>>> wrote:
>>>
 > You probably meant 2.24.0.

 Thanks, yes I did. Mark "Fix Version/s" as "2.24.0" everyone. :)

 On Wed, Jul 29, 2020 at 4:14 PM Valentyn Tymofieiev <
 valen...@google.com> wrote:

> +1, Thanks Daniel!
>
> On Wed, Jul 29, 2020 at 4:04 PM Daniel Oliveira <
> danolive...@google.com> wrote:
>
>> Hi everyone,
>>
>> The next Beam release branch (2.24.0) is scheduled to be cut on
>> August 12 according to the release calendar [1].
>>
>> I'd like to volunteer to handle this release. Following the lead of
>> previous release managers, I plan on cutting the branch on that date and
>> cherrypicking in release-blocking fixes afterwards. So unresolved release
>> blocking JIRA issues should have their "Fix Version/s" marked as 
>> "2.23.0".
>>
> You probably meant 2.24.0 [1].
>
>
>> Any comments or objections?
>>
>> Thanks,
>> Daniel Oliveira
>>
>> [1]
>> https://calendar.google.com/calendar/embed?src=0p73sl034k80oob7seouanigd0%40group.calendar.google.com
>>
> [1] https://issues.apache.org/jira/projects/BEAM/versions/12347146
>



Study On Rejected Refactorings

2020-08-11 Thread Jevgenija Pantiuchina
Dear contributors,

As part of a research team from Università della Svizzera italiana 
(Switzerland) and University of Sannio (Italy), we have analyzed refactoring 
pull requests in apache/beam repository and are looking for developers for a 
short 5-10 min survey 
(https://usi.eu.qualtrics.com/jfe/form/SV_cO6Ayah0D6q4eSF). Would you please 
spare your time by answering some questions about refactoring-related 
contributions? We would greatly appreciate your input — it would help us 
understand how developers can improve the quality of refactoring contributions, 
and benefit the development process. The responses will be anonymized and 
handled confidentially! Thank you a lot!

If you consider this message to be spam, I'm very sorry! There will be no 
follow-up to bug you.


Re: Output timestamp for Python event timers

2020-08-11 Thread Yichi Zhang
+1 to expose set_output_timestamp and enrich python set timer api.

On Tue, Aug 11, 2020 at 12:01 PM Boyuan Zhang  wrote:

> Hi Maximilian,
>
> It makes sense to set  hold_timestamp as fire_timestamp when the
> fire_timestamp is in the event time domain. Otherwise, the system may
> advance the watermark incorrectly.
> I think we can do something similar to Java FnApiRunner[1]:
>
>- Expose set_output_timestamp API to python timer as well
>- If set_output_timestamp is not specified and timer is in event
>domain, we can use fire_timestamp as hold_timestamp
>- Otherwise, use input_timestamp as hold_timestamp.
>
> What do you think?
>
> [1]
> https://github.com/apache/beam/blob/edb42952f6b0aa99477f5c7baca6d6a0d93deb4f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L1433-L1493
>
>
>
>
> On Tue, Aug 11, 2020 at 9:00 AM Maximilian Michels  wrote:
>
>> We ran into problems setting event time timers per-element in the Python
>> SDK. Pipeline progress would stall.
>>
>> Turns out, although the Python SDK does not expose the timer output
>> timestamp feature to the user, it sets the timer output timestamp to the
>> current input timestamp of an element.
>>
>> This will lead to holding back the watermark until the timer fires (the
>> Flink Runner respects the timer output timestamp when advancing the
>> output watermark). We had set the fire timestamp to a timestamp so far
>> in the future, that pipeline progress would completely stall for
>> downstream transforms, due to the held back watermark.
>>
>> Considering that this feature is not even exposed to the user in the
>> Python SDK, I think we should set the default output timestamp to the
>> fire timestamp, and not to the input timestamp. This is also how timer
>> work in the Java SDK.
>>
>> Let me know what you think.
>>
>> -Max
>>
>> PR: https://github.com/apache/beam/pull/12531
>>
>


Re: Stateful Pardo Question

2020-08-11 Thread jmac...@godaddy.com
Ahhh I see. Thank you very much for this additional info. Really helpful!  I 
think after considering further, its probably more appropriate and less risky 
in my current scenario to try to use the Session combiner. I did really like 
the Stateful ParDo way of doing things tho, if it were simpler to get correct 
and as performant as Windows (I understand that Flink has some special 
optimizations for windowing that go all the way down into the rocks db code) I 
might have liked to see this method work out.

Thanks again!

From: Reuven Lax 
Reply-To: "dev@beam.apache.org" 
Date: Sunday, August 9, 2020 at 11:25 PM
To: dev 
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.


Lookin at the code in the repo, it seems to assume that context.timestamp() is 
the "watermark" time. It is not - context.timestamp() is the time of the 
current element being processed. Generally the watermark will always be smaller 
than the timestamp of the current element, as the watermark is a lower bound on 
element timestamps (so you can't really check context.timestamp() to determine 
if a timer is eligible to fire). It's also worth mentioning that Beam provides 
no ordering guarantees on the input elements (unless you are using TestStream 
in a unit test). In theory they could arrive in reverse timestamp order. In the 
real world that degree of disorder is probably unlikely (and would be 
inefficient, as the watermark would then not advance until all elements were 
processed), however the model makes no guarantees about order.

The fact that inputs can arrive in any order means that the sessions code you 
are trying to implement would need some more complexity if you wanted it to be 
correct. The problem is that you may have buffered elements from multiple 
different sessions in your bag, and you may see those elements out of order. 
Resetting the timer to event.getTimestamp().plus(SESSION_TIMEOUT) will cause 
you to potentially create a timer that is too early. There are various ways to 
solve this (e.g. storing an interval tree in a separate state tag so you can 
keep track of which sessions are in flight). The upcoming TimestampOrderedList 
state type will also help to make this sort of use case easier and more 
effficient.

Reuven

On Sun, Aug 9, 2020 at 5:05 PM Reza Ardeshir Rokni 
mailto:raro...@gmail.com>> wrote:
+1 on having the behavior clearly documented, would also be great to try and 
add more stat and timer patterns to the Beam docs patterns page 
https://beam.apache.org/documentation/patterns/overview/.

I think it might be worth thinking about describing these kind of patterns with 
an emphasis on the OnTimer being where the work happens. One thing that would 
make all of this a lot easier in reducing the boiler plate code that would need 
to be written is a sorted map state. ( a topic of discussion on a few threads).

On Mon, 10 Aug 2020 at 01:16, Reuven Lax 
mailto:re...@google.com>> wrote:
Timers in Beam are considered "eligible to fire" once the watermark has 
advanced. This is not the same as saying that they will fire immediately. You 
should not assume ordering between the elements and the timers.

This is one reason (among many) that Beam does not provide a "read watermark" 
primitive, as it leads to confusions such as this. Since there is no 
read-watermark operator, the only way for a user's ParDo to view that the 
watermark has been set is to set a timer and wait for it to expire. Watermarks 
on their own can act in very non-intuitive ways (due to asynchronous 
advancement), so generally we encourage people to reason about timers and 
windowing in their code instead.

Reuven

On Sun, Aug 9, 2020 at 9:39 AM jmac...@godaddy.com 
mailto:jmac...@godaddy.com>> wrote:
I understand that watermarks are concurrently advanced, and that they are 
estimates and not precise. but I’m not sure this is relevant in this case. In 
this repro code we are in processElement() and the watermark HAS advanced but 
the timer has not been called even though we asked the runtime to do that. In 
this case we are in a per-key stateful operating mode and our timer should not 
be shared with any other runners (is that correct?) so it seems to me that we 
should be able to operate in a manner that is locally consistent from the point 
of view of the DoFn we are writing. That is to say, _before_ we enter 
processElement we check any local timers first. I would argue that this would 
be far more sensible from the authors perspective.

From: Reuven Lax mailto:re...@google.com>>
Reply-To: "dev@beam.apache.org" 
mailto:dev@beam.apache.org>>
Date: Thursday, August 6, 2020 at 11:57 PM
To: dev mailto:dev@beam.apache.org>>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.




On Tue, Aug 4, 2020 at 1:08 PM jmac...@godaddy.com 
mailto:jmac...@godaddy.com>> wrote:
So, after some additional 

Re: Output timestamp for Python event timers

2020-08-11 Thread Boyuan Zhang
Hi Maximilian,

It makes sense to set  hold_timestamp as fire_timestamp when the
fire_timestamp is in the event time domain. Otherwise, the system may
advance the watermark incorrectly.
I think we can do something similar to Java FnApiRunner[1]:

   - Expose set_output_timestamp API to python timer as well
   - If set_output_timestamp is not specified and timer is in event domain,
   we can use fire_timestamp as hold_timestamp
   - Otherwise, use input_timestamp as hold_timestamp.

What do you think?

[1]
https://github.com/apache/beam/blob/edb42952f6b0aa99477f5c7baca6d6a0d93deb4f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L1433-L1493




On Tue, Aug 11, 2020 at 9:00 AM Maximilian Michels  wrote:

> We ran into problems setting event time timers per-element in the Python
> SDK. Pipeline progress would stall.
>
> Turns out, although the Python SDK does not expose the timer output
> timestamp feature to the user, it sets the timer output timestamp to the
> current input timestamp of an element.
>
> This will lead to holding back the watermark until the timer fires (the
> Flink Runner respects the timer output timestamp when advancing the
> output watermark). We had set the fire timestamp to a timestamp so far
> in the future, that pipeline progress would completely stall for
> downstream transforms, due to the held back watermark.
>
> Considering that this feature is not even exposed to the user in the
> Python SDK, I think we should set the default output timestamp to the
> fire timestamp, and not to the input timestamp. This is also how timer
> work in the Java SDK.
>
> Let me know what you think.
>
> -Max
>
> PR: https://github.com/apache/beam/pull/12531
>


Re: Stateful Pardo Question

2020-08-11 Thread jmac...@godaddy.com
+1

From: Reza Ardeshir Rokni 
Reply-To: "dev@beam.apache.org" 
Date: Sunday, August 9, 2020 at 5:05 PM
To: dev 
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.


+1 on having the behavior clearly documented, would also be great to try and 
add more stat and timer patterns to the Beam docs patterns page 
https://beam.apache.org/documentation/patterns/overview/.

I think it might be worth thinking about describing these kind of patterns with 
an emphasis on the OnTimer being where the work happens. One thing that would 
make all of this a lot easier in reducing the boiler plate code that would need 
to be written is a sorted map state. ( a topic of discussion on a few threads).

On Mon, 10 Aug 2020 at 01:16, Reuven Lax 
mailto:re...@google.com>> wrote:
Timers in Beam are considered "eligible to fire" once the watermark has 
advanced. This is not the same as saying that they will fire immediately. You 
should not assume ordering between the elements and the timers.

This is one reason (among many) that Beam does not provide a "read watermark" 
primitive, as it leads to confusions such as this. Since there is no 
read-watermark operator, the only way for a user's ParDo to view that the 
watermark has been set is to set a timer and wait for it to expire. Watermarks 
on their own can act in very non-intuitive ways (due to asynchronous 
advancement), so generally we encourage people to reason about timers and 
windowing in their code instead.

Reuven

On Sun, Aug 9, 2020 at 9:39 AM jmac...@godaddy.com 
mailto:jmac...@godaddy.com>> wrote:
I understand that watermarks are concurrently advanced, and that they are 
estimates and not precise. but I’m not sure this is relevant in this case. In 
this repro code we are in processElement() and the watermark HAS advanced but 
the timer has not been called even though we asked the runtime to do that. In 
this case we are in a per-key stateful operating mode and our timer should not 
be shared with any other runners (is that correct?) so it seems to me that we 
should be able to operate in a manner that is locally consistent from the point 
of view of the DoFn we are writing. That is to say, _before_ we enter 
processElement we check any local timers first. I would argue that this would 
be far more sensible from the authors perspective.

From: Reuven Lax mailto:re...@google.com>>
Reply-To: "dev@beam.apache.org" 
mailto:dev@beam.apache.org>>
Date: Thursday, August 6, 2020 at 11:57 PM
To: dev mailto:dev@beam.apache.org>>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.




On Tue, Aug 4, 2020 at 1:08 PM jmac...@godaddy.com 
mailto:jmac...@godaddy.com>> wrote:
So, after some additional digging, it appears that Beam does not consistently 
check for timer expiry before calling process. The result is that it may be the 
case that the watermark has moved beyond your timer expiry, and if youre 
counting on the timer callback happening at the time you set it for, that 
simply may NOT have happened when you are in DoFn.process(). You can “fix” the 
behavior by simply checking the watermark manually in process() and doing what 
you would normally do for timestamp exipry before proceeding. See my latest 
updated code reproducing the issue and showing the fix at  
https://github.com/randomsamples/pardo_repro.

I would argue that users of this API will naturally expect that timer callback 
semantics will guarantee that when they are in process(), if the current 
watermark is past a timers expiry that the timer callback in question will have 
been called. Is there any reason why this isn’t happening? Am I 
misunderstanding something?

Timers do not expire synchronously with the watermark advancing. So if you have 
a timer set for 12pm and the watermark advances past 12pm, that timer is now 
eligible to fire, but might not fire immediately. Some other elements may 
process before that timer fires.

There are multiple reasons for this, but one is that Beam does not guarantee 
that watermark advancement is synchronous with element processing. The 
watermark might advance suddenly while in the middle processing an element, or 
at any other time. This makes it impossible (or at least, exceedingly 
difficult) to really provide the guarantee you expected.

Reuven

From: "jmac...@godaddy.com" 
mailto:jmac...@godaddy.com>>
Reply-To: "dev@beam.apache.org" 
mailto:dev@beam.apache.org>>
Date: Monday, August 3, 2020 at 10:51 AM
To: "dev@beam.apache.org" 
mailto:dev@beam.apache.org>>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.


Yeah, unless I am misunderstanding something. The output from my repro code 
shows event timestamp and the context timestamp every time we process an event.

Receiving event at: 

Re: [DISCUSS][BEAM-10670] Migrating BoundedSource/UnboundedSource to execute as a Splittable DoFn for non-portable Java runners

2020-08-11 Thread Luke Cwik
There shouldn't be any changes required since the wrapper will smoothly
transition the execution to be run as an SDF. New IOs should strongly
prefer to use SDF since it should be simpler to write and will be more
flexible but they can use the "*Source"-based APIs. Eventually we'll
deprecate the APIs but we will never stop supporting them. Eventually they
should all be migrated to use SDF and if there is another major Beam
version, we'll finally be able to remove them.

On Tue, Aug 11, 2020 at 8:40 AM Alexey Romanenko 
wrote:

> Hi Luke,
>
> Great to hear about such progress on this!
>
> Talking about opt-out for all runners in the future, will it require any
> code changes for current “*Source”-based IOs or the wrappers should
> completely smooth this transition?
> Do we need to require to create new IOs only based on SDF or again, the
> wrappers should help to avoid this?
>
> On 10 Aug 2020, at 22:59, Luke Cwik  wrote:
>
> In the past couple of months wrappers[1, 2] have been added to the Beam
> Java SDK which can execute BoundedSource and UnboundedSource as Splittable
> DoFns. These have been opt-out for portable pipelines (e.g. Dataflow runner
> v2, XLang pipelines on Flink/Spark) and opt-in using an experiment for all
> other pipelines.
>
> I would like to start making the non-portable pipelines starting with the
> DirectRunner[3] to be opt-out with the plan that eventually all runners
> will only execute splittable DoFns and the BoundedSource/UnboundedSource
> specific execution logic from the runners will be removed.
>
> Users will be able to opt-in any pipeline using the experiment
> 'use_sdf_read' and opt-out with the experiment 'use_deprecated_read'. (For
> portable pipelines these experiments were 'beam_fn_api' and
> 'beam_fn_api_use_deprecated_read' respectively and I have added these two
> additional aliases to make the experience less confusing).
>
> 1:
> https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L275
> 2:
> https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L449
> 3: https://github.com/apache/beam/pull/12519
>
>
>


Re: [BEAM-10292] change proposal to DefaultFilenamePolicy.ParamsCoder

2020-08-11 Thread Luke Cwik
The filesystem "fixes" all surmount to removing the "isDirectory" boolean
bit and encoding whether something is a directory in the string part of the
resource specification which also turns out to be backwards incompatible
(just in a different way).

Removing the "directory" bit would be great and that would allow us to use
strings instead of resource ids but would require filesystems to perform
the mapping from some standard path specification to their internal
representation.

On Wed, Aug 5, 2020 at 9:26 PM Chamikara Jayalath 
wrote:

> So, based on the comments in the PR, the underlying issue seems to be
> 'FileBasedSink.convertToFileResourceIfPossible(stringCoder.decode(inStream));'
> not returning the correct result, right ?
> If so I think the correct fix might be your proposal (2) - Try to fix the
> underlying filesystem to do a better job of file/dir matching
>
> This is a bug we probably have to fix anyways for the local filesystem
> and/or HDFS and this will also give us a solution that does not break
> update compatibility.
>
> Thanks,
> Cham
>
> On Wed, Aug 5, 2020 at 3:41 PM Luke Cwik  wrote:
>
>> Cham, that was one of the options I had mentioned on the PR. The
>> difference here is that this is a bug fix and existing users could be
>> broken unknowingly so it might be worthwhile to take that breaking change
>> (and possibly provide users a way to perform an upgrade using the old
>> implementation).
>>
>>
>> On Wed, Aug 5, 2020 at 3:33 PM Chamikara Jayalath 
>> wrote:
>>
>>> This might break the update compatibility for Dataflow streaming
>>> pipelines. +Reuven Lax   +Lukasz Cwik
>>> 
>>>
>>> In other cases, to save update compatibility, we introduced a user
>>> option that changes the coder only when the user explicitly asks for an
>>> updated feature that requires the new coder. For example,
>>> https://github.com/apache/beam/commit/304882caa89afe24150062b959ee915c79e72ab3
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>> On Mon, Aug 3, 2020 at 10:00 AM David Janíček 
>>> wrote:
>>>
 Hello everyone,

 I've reported an issue https://issues.apache.org/jira/browse/BEAM-10292
 which is about broken DefaultFilenamePolicy.ParamsCoder behavior.
 DefaultFilenamePolicy.ParamsCoder loses information whether
 DefaultFilenamePolicy.Params's baseFilename resource is file or directory
 on some filesystems, at least on local FS and HDFS.

 After discussion with @dmvk and @lukecwik, we have agreed that the best
 solution could be to take the breaking change and use ResourceIdCoder for
 encoding/decoding DefaultFilenamePolicy.Params's baseFilename, this way the
 file/directory information is preserved.
 The solution is implemented in pull request
 https://github.com/apache/beam/pull/12050.

 I'd like to ask if there is a consensus on this breaking change. Is
 everyone OK with this?
 Thanks in advance for answers.

 Best regards,
 David

>>>


Output timestamp for Python event timers

2020-08-11 Thread Maximilian Michels
We ran into problems setting event time timers per-element in the Python 
SDK. Pipeline progress would stall.


Turns out, although the Python SDK does not expose the timer output 
timestamp feature to the user, it sets the timer output timestamp to the 
current input timestamp of an element.


This will lead to holding back the watermark until the timer fires (the 
Flink Runner respects the timer output timestamp when advancing the 
output watermark). We had set the fire timestamp to a timestamp so far 
in the future, that pipeline progress would completely stall for 
downstream transforms, due to the held back watermark.


Considering that this feature is not even exposed to the user in the 
Python SDK, I think we should set the default output timestamp to the 
fire timestamp, and not to the input timestamp. This is also how timer 
work in the Java SDK.


Let me know what you think.

-Max

PR: https://github.com/apache/beam/pull/12531


Re: [DISCUSS][BEAM-10670] Migrating BoundedSource/UnboundedSource to execute as a Splittable DoFn for non-portable Java runners

2020-08-11 Thread Alexey Romanenko
Hi Luke,

Great to hear about such progress on this! 

Talking about opt-out for all runners in the future, will it require any code 
changes for current “*Source”-based IOs or the wrappers should completely 
smooth this transition? 
Do we need to require to create new IOs only based on SDF or again, the 
wrappers should help to avoid this? 

> On 10 Aug 2020, at 22:59, Luke Cwik  wrote:
> 
> In the past couple of months wrappers[1, 2] have been added to the Beam Java 
> SDK which can execute BoundedSource and UnboundedSource as Splittable DoFns. 
> These have been opt-out for portable pipelines (e.g. Dataflow runner v2, 
> XLang pipelines on Flink/Spark) and opt-in using an experiment for all other 
> pipelines.
> 
> I would like to start making the non-portable pipelines starting with the 
> DirectRunner[3] to be opt-out with the plan that eventually all runners will 
> only execute splittable DoFns and the BoundedSource/UnboundedSource specific 
> execution logic from the runners will be removed.
> 
> Users will be able to opt-in any pipeline using the experiment 'use_sdf_read' 
> and opt-out with the experiment 'use_deprecated_read'. (For portable 
> pipelines these experiments were 'beam_fn_api' and 
> 'beam_fn_api_use_deprecated_read' respectively and I have added these two 
> additional aliases to make the experience less confusing).
> 
> 1: 
> https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L275
>  
> 
> 2: 
> https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L449
>  
> 
> 3: https://github.com/apache/beam/pull/12519 
> 



Re: Status of dynamic worker scaling with Kafka consumers

2020-08-11 Thread Adam Bellemare
Thank you Alexey, I appreciate your responses.

On Tue, Aug 11, 2020 at 10:57 AM Alexey Romanenko 
wrote:

> Hi Adam,
>
> 1) Correct. Current KafkaIO.Read implementation is based on Beam
> “UnboundedSource” which requires to have fixed number of splits at DAG
> construction time.
> 2) Correct.
>
> Dynamic topics and partitions discovering is a long story in Beam. Since
> you are interested in this, it would be worth to take a look on these
> discussions [1][2]. One of the way to have it in Beam is to use
> SplittableDoFn [3] instead of UnboundedSource API. As I mentioned before,
> there is ongoing work on this to make KafkaIO read with SDF [4] and that
> should allow in the future to discover new partitions/topics in runtime.
>
> [1] https://issues.apache.org/jira/browse/BEAM-5786
> [2] https://issues.apache.org/jira/browse/BEAM-727
> [3] https://beam.apache.org/blog/splittable-do-fn/
> [4] https://issues.apache.org/jira/browse/BEAM-9977
>
> On 11 Aug 2020, at 15:01, Adam Bellemare  wrote:
>
> Hello Alexey
>
> Thank you for replying to my questions. A number of my colleagues have
> been musing about the idea of dynamically changing the partition count of
> Apache Kafka's input topics for Beam jobs during runtime (We intend to use
> the Google Dataflow runner for our jobs). I have been hesitant to endorse
> such an operation because my understanding of Beam at this point in time is
> that dynamically scaling the topic partition count up will not be
> automatically detected by the Beam job, such that these partitions will go
> unassigned until the job is restarted.
>
> This, of course, ignores the impact to the state stores, particularly
> data-locality issues. My understanding here (again) is that Beam stores
> keyed state in alignment with the kafka partitions, and so changing the
> partition count would affect the distribution of state significantly (which
> is my primary reason to oppose this operation).
>
> In sum, if you (or anyone else reading this email!) could refute or
> support these statements I would be very grateful:
> 1) Beam doesn't support dynamic upscaling of Kafka partition counts. The
> job needs to be restarted to pick new partitions up (which is in line with
> many other stream processors, and not something I would consider a defect)
> 2) A job's state pertaining to a Kafka source (such as materializing a
> stream) is divided along the Kafka partition boundaries.
>
> Thanks!
>
> On Mon, Aug 10, 2020 at 1:08 PM Alexey Romanenko 
> wrote:
>
>> Hi Adam,
>>
>> 1) Yes, correct. Though, there is ongoing work to do it in runtime and
>> support topics/partitions discovering.
>>
>> 2) Yes but in case of worker fails, its task (read from specific
>> partition in case of KafkaIO) will be assigned to different one. How? It
>> depends on underlying data processing engine.
>>
>> 3) In general - yes, but some specific things, like storing the
>> checkpoints for unbounded sources, could be different in terms of
>> implementation. Though, Beam model should be applied in the same way for
>> different runners, however, the implementation can vary. This is actually
>> why Beam runners exist - they apply Beam model on different data processing
>> engine and make it unified for Beam users.
>>
>> 4) Please, see 3)
>>
>> I hope it will shed some light =) Please, let us know if you have more
>> questions.
>>
>> Regards,
>> Alexey
>>
>> On 6 Aug 2020, at 18:57, Adam Bellemare  wrote:
>>
>> Hi Folks
>>
>> When processing events from Kafka, it seems that, from my reading, the
>> distribution of partitions maps directly to the worker via the concept of
>> 'splits' :
>>
>>
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54
>>
>> From the code:
>>
>> > The partitions are evenly distributed among the splits. The number of
>> splits returned is {@code
>> > min(desiredNumSplits, totalNumPartitions)}, though better not to depend
>> on the exact count.
>>
>> > It is important to assign the partitions deterministically so that
>> we can support resuming a
>> > split from last checkpoint. The Kafka partitions are sorted by {@code
>> } and then
>> > assigned to splits in round-robin order.
>>
>> I'm not intimately familiar with Beam's execution model, but my reading
>> of this code suggests that:
>> 1) Beam allocates partitions to workers once, at creation time
>> 2) This implies that once started, the worker count cannot be changed as
>> the partitions are not redistributed
>> 3) Any state is tied to the split, which is in turn tied to the worker.
>> This means outside of, say, a global window
>> ,
>> materialized kafka state is "localized" to a worker.
>>
>> Follow up Q:
>> 4) Is this independent of the runner? I am much more familiar with Apache
>> Spark as a runner than say, Dataflow.
>>
>> If any could confirm or refute my 3 

Re: Memory Issue When Running Beam On Flink

2020-08-11 Thread Maximilian Michels

Hi!

Looks like a potential leak, caused by your code or by Beam itself. 
Would you be able to supply a heap dump from one of the task managers? 
That would greatly help debugging this issue.


-Max

On 07.08.20 00:19, David Gogokhiya wrote:

Hi,

We recently started using Apache Beam version 2.20.0 running on Flink 
version 1.9 deployed on kubernetes to process unbounded streams of data. 
However, we noticed that the memory consumed by stateful Beam is 
steadily increasing over time with no drops no matter what the current 
bandwidth is. We were wondering if this is expected and if not what 
would be the best way to resolve it.



  More Context

We have the following pipeline that consumes messages from the unbounded 
stream of data. Later we deduplicate the messages based on unique 
message id using the deduplicate function 
. 
Since we are using Beam version 2.20.0, we copied the source code of the 
deduplicate function 
from 
version 2.22.0. After that we unmap the tuple, retrieve the necessary 
data from message payload and dump the corresponding data into the log.



Pipeline:


Flink configuration:


As we mentioned before, we noticed that the memory usage of the 
jobmanager and taskmanager pod are steadily increasing with no drops no 
matter what the current bandwidth is. We tried allocating more memory 
but it seems like no matter how much memory we allocate it eventually 
reaches its limit and then it tries to restart itself.



Sincerely, David




Re: Status of dynamic worker scaling with Kafka consumers

2020-08-11 Thread Alexey Romanenko
Hi Adam,

1) Correct. Current KafkaIO.Read implementation is based on Beam 
“UnboundedSource” which requires to have fixed number of splits at DAG 
construction time.
2) Correct.

Dynamic topics and partitions discovering is a long story in Beam. Since you 
are interested in this, it would be worth to take a look on these discussions 
[1][2]. One of the way to have it in Beam is to use SplittableDoFn [3] instead 
of UnboundedSource API. As I mentioned before, there is ongoing work on this to 
make KafkaIO read with SDF [4] and that should allow in the future to discover 
new partitions/topics in runtime.

[1] https://issues.apache.org/jira/browse/BEAM-5786
[2] https://issues.apache.org/jira/browse/BEAM-727 
[3] https://beam.apache.org/blog/splittable-do-fn/
[4] https://issues.apache.org/jira/browse/BEAM-9977

> On 11 Aug 2020, at 15:01, Adam Bellemare  wrote:
> 
> Hello Alexey
> 
> Thank you for replying to my questions. A number of my colleagues have been 
> musing about the idea of dynamically changing the partition count of Apache 
> Kafka's input topics for Beam jobs during runtime (We intend to use the 
> Google Dataflow runner for our jobs). I have been hesitant to endorse such an 
> operation because my understanding of Beam at this point in time is that 
> dynamically scaling the topic partition count up will not be automatically 
> detected by the Beam job, such that these partitions will go unassigned until 
> the job is restarted. 
> 
> This, of course, ignores the impact to the state stores, particularly 
> data-locality issues. My understanding here (again) is that Beam stores keyed 
> state in alignment with the kafka partitions, and so changing the partition 
> count would affect the distribution of state significantly (which is my 
> primary reason to oppose this operation).
> 
> In sum, if you (or anyone else reading this email!) could refute or support 
> these statements I would be very grateful:
> 1) Beam doesn't support dynamic upscaling of Kafka partition counts. The job 
> needs to be restarted to pick new partitions up (which is in line with many 
> other stream processors, and not something I would consider a defect)
> 2) A job's state pertaining to a Kafka source (such as materializing a 
> stream) is divided along the Kafka partition boundaries.
> 
> Thanks!
> 
> On Mon, Aug 10, 2020 at 1:08 PM Alexey Romanenko  > wrote:
> Hi Adam,
> 
> 1) Yes, correct. Though, there is ongoing work to do it in runtime and 
> support topics/partitions discovering. 
> 
> 2) Yes but in case of worker fails, its task (read from specific partition in 
> case of KafkaIO) will be assigned to different one. How? It depends on 
> underlying data processing engine.
> 
> 3) In general - yes, but some specific things, like storing the checkpoints 
> for unbounded sources, could be different in terms of implementation. Though, 
> Beam model should be applied in the same way for different runners, however, 
> the implementation can vary. This is actually why Beam runners exist - they 
> apply Beam model on different data processing engine and make it unified for 
> Beam users.
> 
> 4) Please, see 3)
> 
> I hope it will shed some light =) Please, let us know if you have more 
> questions.
> 
> Regards,
> Alexey
> 
>> On 6 Aug 2020, at 18:57, Adam Bellemare > > wrote:
>> 
>> Hi Folks
>> 
>> When processing events from Kafka, it seems that, from my reading, the 
>> distribution of partitions maps directly to the worker via the concept of 
>> 'splits' :
>> 
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54
>>  
>> 
>> 
>> From the code:
>> 
>> > The partitions are evenly distributed among the splits. The number of 
>> > splits returned is {@code 
>> > min(desiredNumSplits, totalNumPartitions)}, though better not to depend on 
>> > the exact count.
>> 
>> > It is important to assign the partitions deterministically so that we 
>> > can support resuming a 
>> > split from last checkpoint. The Kafka partitions are sorted by {@code 
>> > } and then
>> > assigned to splits in round-robin order.
>> 
>> I'm not intimately familiar with Beam's execution model, but my reading of 
>> this code suggests that:
>> 1) Beam allocates partitions to workers once, at creation time
>> 2) This implies that once started, the worker count cannot be changed as the 
>> partitions are not redistributed
>> 3) Any state is tied to the split, which is in turn tied to the worker. This 
>> means outside of, say, a global window 
>> ,
>>  materialized kafka state is "localized" to a worker.
>> 
>> Follow up Q:
>> 4) Is this independent of the runner? I 

Re: Status of dynamic worker scaling with Kafka consumers

2020-08-11 Thread Adam Bellemare
Hello Alexey

Thank you for replying to my questions. A number of my colleagues have been
musing about the idea of dynamically changing the partition count of Apache
Kafka's input topics for Beam jobs during runtime (We intend to use the
Google Dataflow runner for our jobs). I have been hesitant to endorse such
an operation because my understanding of Beam at this point in time is that
dynamically scaling the topic partition count up will not be automatically
detected by the Beam job, such that these partitions will go unassigned
until the job is restarted.

This, of course, ignores the impact to the state stores, particularly
data-locality issues. My understanding here (again) is that Beam stores
keyed state in alignment with the kafka partitions, and so changing the
partition count would affect the distribution of state significantly (which
is my primary reason to oppose this operation).

In sum, if you (or anyone else reading this email!) could refute or support
these statements I would be very grateful:
1) Beam doesn't support dynamic upscaling of Kafka partition counts. The
job needs to be restarted to pick new partitions up (which is in line with
many other stream processors, and not something I would consider a defect)
2) A job's state pertaining to a Kafka source (such as materializing a
stream) is divided along the Kafka partition boundaries.

Thanks!

On Mon, Aug 10, 2020 at 1:08 PM Alexey Romanenko 
wrote:

> Hi Adam,
>
> 1) Yes, correct. Though, there is ongoing work to do it in runtime and
> support topics/partitions discovering.
>
> 2) Yes but in case of worker fails, its task (read from specific partition
> in case of KafkaIO) will be assigned to different one. How? It depends on
> underlying data processing engine.
>
> 3) In general - yes, but some specific things, like storing the
> checkpoints for unbounded sources, could be different in terms of
> implementation. Though, Beam model should be applied in the same way for
> different runners, however, the implementation can vary. This is actually
> why Beam runners exist - they apply Beam model on different data processing
> engine and make it unified for Beam users.
>
> 4) Please, see 3)
>
> I hope it will shed some light =) Please, let us know if you have more
> questions.
>
> Regards,
> Alexey
>
> On 6 Aug 2020, at 18:57, Adam Bellemare  wrote:
>
> Hi Folks
>
> When processing events from Kafka, it seems that, from my reading, the
> distribution of partitions maps directly to the worker via the concept of
> 'splits' :
>
>
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54
>
> From the code:
>
> > The partitions are evenly distributed among the splits. The number of
> splits returned is {@code
> > min(desiredNumSplits, totalNumPartitions)}, though better not to depend
> on the exact count.
>
> > It is important to assign the partitions deterministically so that we
> can support resuming a
> > split from last checkpoint. The Kafka partitions are sorted by {@code
> } and then
> > assigned to splits in round-robin order.
>
> I'm not intimately familiar with Beam's execution model, but my reading of
> this code suggests that:
> 1) Beam allocates partitions to workers once, at creation time
> 2) This implies that once started, the worker count cannot be changed as
> the partitions are not redistributed
> 3) Any state is tied to the split, which is in turn tied to the worker.
> This means outside of, say, a global window
> ,
> materialized kafka state is "localized" to a worker.
>
> Follow up Q:
> 4) Is this independent of the runner? I am much more familiar with Apache
> Spark as a runner than say, Dataflow.
>
> If any could confirm or refute my 3 statements and 1 question, it would go
> a long way towards validating my understanding of Beam's current
> relationship to scaling and partitioned data locality with Kafka.
>
> Thanks
>
>
>


Re: Memory Issue When Running Beam On Flink

2020-08-11 Thread Jan Lukavský

Hi David,

what's the state backend you use for Flink? The default probably would 
be FsStateBackend, which stores whole state in memory of TaskManager. 
That could explain the behavior you are seeing, as the deduplication has 
to store all seen keys in memory. I'm afraid that although the key is 
cleared after timeout, the key still leaves a record in the 
TaskManager's heap (this is the case for non-portable runner, there are 
some timers associated with each key set for the end of window - this 
_might_ need fixing, as it seems suboptimal). You can confirm this by 
taking heap dump of TaskManager and look for timer-related objects.


You can try different state backend - RocksDBStateBackend, which would 
store the data on disk. However, this state backend has known (memory 
related) issues when running on k8s, these issues were fixed in Flink 
1.10.0, so you probably would have to use at least that version (or 
better 1.10.1). Another consideration of this is that storing state in 
RocksDB has performance implications (because data is located on disk).


Another (sort of hackish, but maybe actually useful) solution could be 
to apply the deduplication in two successive fixed overlapping windows 
(e.g. two 1 minute windows, shifted by 30 seconds), because when window 
expires, the timers should be cleared and that could limit the number of 
keys actually held in memory. There needs to be two deduplications, 
because events on boundary of the first window would not be deduplicated.


Hope this helps,

Jan

On 8/10/20 10:14 PM, David Gogokhiya wrote:
It roughly takes multiple days (~5 days) to reach the memory limit. It 
looks like Beam's last operator stops producing any events (image link 
) once the taskmanager's memory 
usage hits its limit (image link 
). After that the Beam is being 
stuck in this degraded state not being able to produce any events. 
It's worth noting that regular cluster restart with keeping the 
previous state doesn't help. Immediately after the restart, 
taskmanager's memory usage goes back to it's before restart value. 
Beam still doesn't produce any events at this point. The only thing 
that helps is restarting the cluster with dropping the previously 
saved state. Only in this case, Beam starts functioning as expected.


I am still trying to understand whether infinitely growing 
taskmanager's memory usage is an expected behavior or not?


Sincerely,
David

On Thu, Aug 6, 2020 at 3:19 PM David Gogokhiya > wrote:


Hi,
We recently started using Apache Beam version 2.20.0 running on
Flink version 1.9 deployed on kubernetes to process unbounded
streams of data. However, we noticed that the memory consumed by
stateful Beam is steadily increasing over time with no drops no
matter what the current bandwidth is. We were wondering if this is
expected and if not what would be the best way to resolve it.


  More Context

We have the following pipeline that consumes messages from the
unbounded stream of data. Later we deduplicate the messages based
on unique message id using the deduplicate function

.
Since we are using Beam version 2.20.0, we copied the source code
of the deduplicate function

from
version 2.22.0. After that we unmap the tuple, retrieve the
necessary data from message payload and dump the corresponding
data into the log.


Pipeline:


Flink configuration:


As we mentioned before, we noticed that the memory usage of the
jobmanager and taskmanager pod are steadily increasing with no
drops no matter what the current bandwidth is. We tried allocating
more memory but it seems like no matter how much memory we
allocate it eventually reaches its limit and then it tries to
restart itself.

Sincerely, David