Re: Output timestamp for Python event timers

2020-08-12 Thread Boyuan Zhang
Thanks for your help! I'll take a look at the PR.

On Wed, Aug 12, 2020 at 2:27 AM Maximilian Michels  wrote:

> Thanks for your suggestions!
>
> It makes sense to complete the work on this feature by exposing it in
> the Python API. We can do this as a next step. (There might be questions
> on how to do that exactly)
>
> For now, I'm concerned with getting the semantics right and unblocking
> users from stalling pipelines.
>
> I wasn't aware that processing timers used the input timestamp as the
> timer output timestamp. I've updated the PR accordingly. Please take a
> look: https://github.com/apache/beam/pull/12531
>
> -Max
>
> On 12.08.20 05:03, Luke Cwik wrote:
> > +1 on what Boyuan said. It is important that the defaults for processing
> > time domain differ from the defaults for the event time domain.
> >
> > On Tue, Aug 11, 2020 at 12:36 PM Yichi Zhang  > > wrote:
> >
> > +1 to expose set_output_timestamp and enrich python set timer api.
> >
> > On Tue, Aug 11, 2020 at 12:01 PM Boyuan Zhang  > > wrote:
> >
> > Hi Maximilian,
> >
> > It makes sense to set  hold_timestamp as fire_timestamp when the
> > fire_timestamp is in the event time domain. Otherwise, the
> > system may advance the watermark incorrectly.
> > I think we can do something similar to Java FnApiRunner[1]:
> >
> >   * Expose set_output_timestamp API to python timer as well
> >   * If set_output_timestamp is not specified and timer is in
> > event domain, we can use fire_timestamp as hold_timestamp
> >   * Otherwise, use input_timestamp as hold_timestamp.
> >
> > What do you think?
> >
> > [1]
> >
> https://github.com/apache/beam/blob/edb42952f6b0aa99477f5c7baca6d6a0d93deb4f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L1433-L1493
> >
> >
> >
> >
> > On Tue, Aug 11, 2020 at 9:00 AM Maximilian Michels
> > mailto:m...@apache.org>> wrote:
> >
> > We ran into problems setting event time timers per-element
> > in the Python
> > SDK. Pipeline progress would stall.
> >
> > Turns out, although the Python SDK does not expose the timer
> > output
> > timestamp feature to the user, it sets the timer output
> > timestamp to the
> > current input timestamp of an element.
> >
> > This will lead to holding back the watermark until the timer
> > fires (the
> > Flink Runner respects the timer output timestamp when
> > advancing the
> > output watermark). We had set the fire timestamp to a
> > timestamp so far
> > in the future, that pipeline progress would completely stall
> > for
> > downstream transforms, due to the held back watermark.
> >
> > Considering that this feature is not even exposed to the
> > user in the
> > Python SDK, I think we should set the default output
> > timestamp to the
> > fire timestamp, and not to the input timestamp. This is also
> > how timer
> > work in the Java SDK.
> >
> > Let me know what you think.
> >
> > -Max
> >
> > PR: https://github.com/apache/beam/pull/12531
> >
>


Re: Output timestamp for Python event timers

2020-08-12 Thread Maximilian Michels

Thanks for your suggestions!

It makes sense to complete the work on this feature by exposing it in 
the Python API. We can do this as a next step. (There might be questions 
on how to do that exactly)


For now, I'm concerned with getting the semantics right and unblocking 
users from stalling pipelines.


I wasn't aware that processing timers used the input timestamp as the 
timer output timestamp. I've updated the PR accordingly. Please take a 
look: https://github.com/apache/beam/pull/12531


-Max

On 12.08.20 05:03, Luke Cwik wrote:
+1 on what Boyuan said. It is important that the defaults for processing 
time domain differ from the defaults for the event time domain.


On Tue, Aug 11, 2020 at 12:36 PM Yichi Zhang > wrote:


+1 to expose set_output_timestamp and enrich python set timer api.

On Tue, Aug 11, 2020 at 12:01 PM Boyuan Zhang mailto:boyu...@google.com>> wrote:

Hi Maximilian,

It makes sense to set  hold_timestamp as fire_timestamp when the
fire_timestamp is in the event time domain. Otherwise, the
system may advance the watermark incorrectly.
I think we can do something similar to Java FnApiRunner[1]:

  * Expose set_output_timestamp API to python timer as well
  * If set_output_timestamp is not specified and timer is in
event domain, we can use fire_timestamp as hold_timestamp
  * Otherwise, use input_timestamp as hold_timestamp.

What do you think?

[1]

https://github.com/apache/beam/blob/edb42952f6b0aa99477f5c7baca6d6a0d93deb4f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L1433-L1493




On Tue, Aug 11, 2020 at 9:00 AM Maximilian Michels
mailto:m...@apache.org>> wrote:

We ran into problems setting event time timers per-element
in the Python
SDK. Pipeline progress would stall.

Turns out, although the Python SDK does not expose the timer
output
timestamp feature to the user, it sets the timer output
timestamp to the
current input timestamp of an element.

This will lead to holding back the watermark until the timer
fires (the
Flink Runner respects the timer output timestamp when
advancing the
output watermark). We had set the fire timestamp to a
timestamp so far
in the future, that pipeline progress would completely stall
for
downstream transforms, due to the held back watermark.

Considering that this feature is not even exposed to the
user in the
Python SDK, I think we should set the default output
timestamp to the
fire timestamp, and not to the input timestamp. This is also
how timer
work in the Java SDK.

Let me know what you think.

-Max

PR: https://github.com/apache/beam/pull/12531



Re: Output timestamp for Python event timers

2020-08-11 Thread Luke Cwik
+1 on what Boyuan said. It is important that the defaults for processing
time domain differ from the defaults for the event time domain.

On Tue, Aug 11, 2020 at 12:36 PM Yichi Zhang  wrote:

> +1 to expose set_output_timestamp and enrich python set timer api.
>
> On Tue, Aug 11, 2020 at 12:01 PM Boyuan Zhang  wrote:
>
>> Hi Maximilian,
>>
>> It makes sense to set  hold_timestamp as fire_timestamp when the
>> fire_timestamp is in the event time domain. Otherwise, the system may
>> advance the watermark incorrectly.
>> I think we can do something similar to Java FnApiRunner[1]:
>>
>>- Expose set_output_timestamp API to python timer as well
>>- If set_output_timestamp is not specified and timer is in event
>>domain, we can use fire_timestamp as hold_timestamp
>>- Otherwise, use input_timestamp as hold_timestamp.
>>
>> What do you think?
>>
>> [1]
>> https://github.com/apache/beam/blob/edb42952f6b0aa99477f5c7baca6d6a0d93deb4f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L1433-L1493
>>
>>
>>
>>
>> On Tue, Aug 11, 2020 at 9:00 AM Maximilian Michels 
>> wrote:
>>
>>> We ran into problems setting event time timers per-element in the Python
>>> SDK. Pipeline progress would stall.
>>>
>>> Turns out, although the Python SDK does not expose the timer output
>>> timestamp feature to the user, it sets the timer output timestamp to the
>>> current input timestamp of an element.
>>>
>>> This will lead to holding back the watermark until the timer fires (the
>>> Flink Runner respects the timer output timestamp when advancing the
>>> output watermark). We had set the fire timestamp to a timestamp so far
>>> in the future, that pipeline progress would completely stall for
>>> downstream transforms, due to the held back watermark.
>>>
>>> Considering that this feature is not even exposed to the user in the
>>> Python SDK, I think we should set the default output timestamp to the
>>> fire timestamp, and not to the input timestamp. This is also how timer
>>> work in the Java SDK.
>>>
>>> Let me know what you think.
>>>
>>> -Max
>>>
>>> PR: https://github.com/apache/beam/pull/12531
>>>
>>


Re: Output timestamp for Python event timers

2020-08-11 Thread Yichi Zhang
+1 to expose set_output_timestamp and enrich python set timer api.

On Tue, Aug 11, 2020 at 12:01 PM Boyuan Zhang  wrote:

> Hi Maximilian,
>
> It makes sense to set  hold_timestamp as fire_timestamp when the
> fire_timestamp is in the event time domain. Otherwise, the system may
> advance the watermark incorrectly.
> I think we can do something similar to Java FnApiRunner[1]:
>
>- Expose set_output_timestamp API to python timer as well
>- If set_output_timestamp is not specified and timer is in event
>domain, we can use fire_timestamp as hold_timestamp
>- Otherwise, use input_timestamp as hold_timestamp.
>
> What do you think?
>
> [1]
> https://github.com/apache/beam/blob/edb42952f6b0aa99477f5c7baca6d6a0d93deb4f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L1433-L1493
>
>
>
>
> On Tue, Aug 11, 2020 at 9:00 AM Maximilian Michels  wrote:
>
>> We ran into problems setting event time timers per-element in the Python
>> SDK. Pipeline progress would stall.
>>
>> Turns out, although the Python SDK does not expose the timer output
>> timestamp feature to the user, it sets the timer output timestamp to the
>> current input timestamp of an element.
>>
>> This will lead to holding back the watermark until the timer fires (the
>> Flink Runner respects the timer output timestamp when advancing the
>> output watermark). We had set the fire timestamp to a timestamp so far
>> in the future, that pipeline progress would completely stall for
>> downstream transforms, due to the held back watermark.
>>
>> Considering that this feature is not even exposed to the user in the
>> Python SDK, I think we should set the default output timestamp to the
>> fire timestamp, and not to the input timestamp. This is also how timer
>> work in the Java SDK.
>>
>> Let me know what you think.
>>
>> -Max
>>
>> PR: https://github.com/apache/beam/pull/12531
>>
>


Re: Output timestamp for Python event timers

2020-08-11 Thread Boyuan Zhang
Hi Maximilian,

It makes sense to set  hold_timestamp as fire_timestamp when the
fire_timestamp is in the event time domain. Otherwise, the system may
advance the watermark incorrectly.
I think we can do something similar to Java FnApiRunner[1]:

   - Expose set_output_timestamp API to python timer as well
   - If set_output_timestamp is not specified and timer is in event domain,
   we can use fire_timestamp as hold_timestamp
   - Otherwise, use input_timestamp as hold_timestamp.

What do you think?

[1]
https://github.com/apache/beam/blob/edb42952f6b0aa99477f5c7baca6d6a0d93deb4f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L1433-L1493




On Tue, Aug 11, 2020 at 9:00 AM Maximilian Michels  wrote:

> We ran into problems setting event time timers per-element in the Python
> SDK. Pipeline progress would stall.
>
> Turns out, although the Python SDK does not expose the timer output
> timestamp feature to the user, it sets the timer output timestamp to the
> current input timestamp of an element.
>
> This will lead to holding back the watermark until the timer fires (the
> Flink Runner respects the timer output timestamp when advancing the
> output watermark). We had set the fire timestamp to a timestamp so far
> in the future, that pipeline progress would completely stall for
> downstream transforms, due to the held back watermark.
>
> Considering that this feature is not even exposed to the user in the
> Python SDK, I think we should set the default output timestamp to the
> fire timestamp, and not to the input timestamp. This is also how timer
> work in the Java SDK.
>
> Let me know what you think.
>
> -Max
>
> PR: https://github.com/apache/beam/pull/12531
>


Output timestamp for Python event timers

2020-08-11 Thread Maximilian Michels
We ran into problems setting event time timers per-element in the Python 
SDK. Pipeline progress would stall.


Turns out, although the Python SDK does not expose the timer output 
timestamp feature to the user, it sets the timer output timestamp to the 
current input timestamp of an element.


This will lead to holding back the watermark until the timer fires (the 
Flink Runner respects the timer output timestamp when advancing the 
output watermark). We had set the fire timestamp to a timestamp so far 
in the future, that pipeline progress would completely stall for 
downstream transforms, due to the held back watermark.


Considering that this feature is not even exposed to the user in the 
Python SDK, I think we should set the default output timestamp to the 
fire timestamp, and not to the input timestamp. This is also how timer 
work in the Java SDK.


Let me know what you think.

-Max

PR: https://github.com/apache/beam/pull/12531