Re: [VOTE] Release 2.20.0, release candidate #2

2020-04-11 Thread Aaron Dixon
+1

- Ran full Clojure ports of all mobile gaming demos ([1],[2]): user score,
hourly team score, leaderboard, game stats, and stateful team score using
thurber[3]
 ...using 2.20.0 + JDK8 + Dataflow
- Ran thurber test suite (DirectRunner, org.apache.beam.sdk.testing) which
exercises SplittableDoFn (had to make slight changes to what seem to be new
semantics/annotations vis a vis @Element, @Restriction, etc)

[1] https://github.com/atdixon/thurber/tree/master/demo/game
[2]
https://github.com/atdixon/thurber/blob/master/doc/running-mobile-gaming-examples.md
[3] https://github.com/atdixon/thurber

On Sat, Apr 11, 2020 at 12:24 AM Jean-Baptiste Onofre 
wrote:

> +1 (binding)
>
> Quickly checked on beam-samples.
>
> Regards
> JB
>
> Le 10 avr. 2020 à 17:25, Kenneth Knowles  a écrit :
>
> +1
>
> Ran a small Java pipeline.
>
> Noting that the version in gradle.properties is 2.20.0-RC2:
> https://github.com/apache/beam/blob/v2.20.0-RC2/gradle.properties#L26.
> The versions in the built Java artifacts all seem to be the desired value
> of 2.20.0.
>
> Kenn
>
> On Thu, Apr 9, 2020 at 5:19 PM Robert Bradshaw 
> wrote:
>
>> +1, the artifacts and signatures all look good, and I also checked that
>> the Python wheels work with a simple pipeline in a fresh virtual
>> environment.
>>
>> On Thu, Apr 9, 2020 at 5:11 PM Ahmet Altay  wrote:
>>
>>> +1 - validated python quickstarts batch/streaming with python 2.7.
>>>
>>> Thank you Rui!
>>>
>>> On Thu, Apr 9, 2020 at 12:28 PM Valentyn Tymofieiev 
>>> wrote:
>>>
 +1. Checked mobile gaming batch examples, and a streaming quickstart on
 Dataflow, on Python 3.7 using Linux wheels.

 On Thu, Apr 9, 2020 at 11:13 AM Rui Wang  wrote:

> Hi everyone,
> Please review and vote on the release candidate #2 for the version
> 2.20.0, as follows:
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
>
> The complete staging area is available for your review, which includes:
> * JIRA release notes [1],
> * the official Apache source release to be deployed to dist.apache.org 
> [2],
> which is signed with the key with fingerprint 699A 22D2 D4F0 0AD3 957B
>   6A88 38B1 C6B4 25EB A67C [3],
> * all artifacts to be deployed to the Maven Central Repository [4],
> * source code tag "v2.20.0-RC2" [5],
> * website pull request listing the release [6], publishing the API
> reference manual [7], and the blog post [8].
> * Java artifacts were built with Maven 3.6.2 and OpenJDK 1.8.0_181.
> * Python artifacts are deployed along with the source release to the
> dist.apache.org [2].
> * Validation sheet with a tab for 2.20.0 release to help with
> validation [9].
> * Docker images published to Docker Hub [10].
>
> The vote will be open for at least 72 hours. It is adopted by majority
> approval, with at least 3 PMC affirmative votes.
>
> Thanks,
> Release Manager
>
> [1]
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12319527=12346780
> [2] https://dist.apache.org/repos/dist/dev/beam/2.20.0/
> [3] https://dist.apache.org/repos/dist/release/beam/KEYS
> [4]
> https://repository.apache.org/content/repositories/orgapachebeam-1101/
> [5] https://github.com/apache/beam/tree/v2.20.0-RC2
> [6] https://github.com/apache/beam/pull/11285
> [7] https://github.com/apache/beam-site/pull/602
> [8] https://github.com/apache/beam/pull/11298
> [9]
> https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=318600984
> [10] https://hub.docker.com/search?q=apache%2Fbeam=image
>

>


Re: Beam's Avro 1.8.x dependency

2020-01-17 Thread Aaron Dixon
@Tomo Suzuki Thanks for looking at this and your thorough analysis// let me
know if I can help with any regression testing, CRs, or more context
anything else.

---

@Elliotte I was a little confused at first re: vendoring vs shading but
here is how I understand it (happy to be corrected if anything I say here
is a mischaracterization); I tend to over-explain for my own clarity
fwiw--using Guava as an example:

With SHADING you have your project (e.g., Beam) depend directly on Guava
and its public package names. Your build will then compile and link to
these package names and only after-the-fact [1] rewrite the byte-code to
rename the guava package. This entails both byte-code rewriting the Guava
class files (to rename the packages of the Guava code itself) and byte-code
rewriting all of your project class files that import Guava (to change the
import to use the new package name).

With VENDORING you release a renamed Guava ("vendor") JAR and depend on
that directly from your project. So your project's source code itself
imports the (repackated)  vendor JAR and depends on the
renamed packages directly. Your project build then has no need for shading
or byte code rewriting of its own classes. I see vendoring making it very
clear what is going on when looking at a project's source code and its
dependencies, and there are likely other tangible brass-tacks benefits to
vendoring besides the conceptual clarity over shading.

[1] You can see that Maven shade:shade goal binds by default to Maven's
package phase (which is well after after compilation).

On Fri, Jan 17, 2020 at 9:16 AM Tomo Suzuki  wrote:

> Ismaël and I think the short-term solution for Aaron's issue is to
> define Beam's own class for Avro's TimestampConversion. Luckily it's
> the only blocker for Avro 1.9 I found.
> I created a ticket https://issues.apache.org/jira/browse/BEAM-9144 and
> PR https://github.com/apache/beam/pull/10628 .
>
> Regards,
> Tomo
>
> On Fri, Jan 17, 2020 at 7:32 AM Elliotte Rusty Harold
>  wrote:
> >
> > On Thu, Jan 16, 2020 at 7:08 PM Kenneth Knowles  wrote:
> > >
> > > Regarding Google's advice about shading: don't go to a "one version
> rule" monorepo for advice about solving diamond dependencies in the wild.
> >
> > FWIW these docs came out of the open source, Github, multirepo, many
> > versions part of Google Cloud, not the mono repo world of google3.
> > Though I will say the more time I spend trying to deal with these
> > issues the more I love the monorepo.
> >
> > > It is a useful description of the pitfalls. We (and Flink before us,
> and likely many more) are already doing something that avoids many of them
> or makes them less likely. Building a separate vendored library is simpler
> and more robust than the "shade during build".
> > >
> >
> > I've only encountered "vendoring" as distinct from shading in Beam.
> > Are there more details about this anywhere? A quick Google search only
> > turned up one Beam doc that pointed to Go docs and a cloud endpoints
> > doc that seemed to use it as a synonym for shading.
> >
> >
> > --
> > Elliotte Rusty Harold
> > elh...@ibiblio.org
>
>
>
> --
> Regards,
> Tomo
>


Re: Beam's Avro 1.8.x dependency

2020-01-16 Thread Aaron Dixon
Looks like there's some strategy to get to the right solution here and that
it may likely involve breaking compatibility.

One option for myself would be to strip the Beam JAR of AvroCoder and
combine with the old AvroCoder from Beam 2.16 -- this would allow me to
upgrade Beam but of course is rather hacky.

On second thought, was the breaking change from Beam 2.16->2.17 really
necessary? If not, could AvroCoder be restored to a 1.9.x "compatible"
implementation and kept this way for the Beam 2.1x version lineage?

This seems like a somewhat fair ask given the way that I'm suddenly blocked
--- however I do realize this is somewhat of a technicality; ie, Beam
2.16-'s compatibility with my usage of Avro 1.9.x was incidental.

But, still, if the changes to AvroCoder weren't necessary, restoring back
would unblock me and anyone else using Avro 1.9.x (surely I'm not the only
one!?)


On Thu, Jan 16, 2020 at 12:22 PM Elliotte Rusty Harold 
wrote:

> Avro does not follow semver. They update the major version when the
> serialization format changes and the minor version when the API
> changes in a backwards incompatible way. See
> https://issues.apache.org/jira/browse/AVRO-2687
>
> On Thu, Jan 16, 2020 at 12:50 PM Luke Cwik  wrote:
> >
> > Does avro not follow semantic versioning and upgrading to 1.9 should
> have been backwards compatible or does our usage reach into the internals
> of avro?
> >
> > On Thu, Jan 16, 2020 at 6:16 AM Ismaël Mejía  wrote:
> >>
> >> I forgot to explain why the most obvious path (just upgrade Avro to
> version
> >> 1.9.x) is not a valid long term solution. Other systems Beam runs on
> top of
> >> (e.g.  Spark!) also leak Avro into their core so in the moment Spark
> moves up
> >> to Avro 1.9.x Spark runner users will be in a really fragile position
> where
> >> things will work until they don't (similar to Aaron's case) so a
> stronger reason
> >> to getAvro out of Beam core.
> >>
> >>
> >> On Thu, Jan 16, 2020 at 1:59 PM Elliotte Rusty Harold <
> elh...@ibiblio.org> wrote:
> >>>
> >>> Shading should be a last resort:
> >>>
> >>> https://jlbp.dev/JLBP-18.html
> >>>
> >>> It tends to cause more problems than it solves. At best it's a stopgap
> >>> measure when you don't have the resources to fix the real problem. In
> >>> this case it sounds like the real issue is that AVRO is not stable.
> >>> There are at least three other solutions in a case like this:
> >>>
> >>> 1. Fix Avro at the root.
> >>> 2. Fork Avro and then fix it.
> >>> 3. Stop depending on Avro.
> >>>
> >>> None of these are trivial which is why shading gets considered.
> >>> However shading doesn't fix the underlying problems, and ultimately
> >>> makes a product as unreliable as its least reliable dependency. :-(
> >>>
> >>> On Thu, Jan 16, 2020 at 2:01 AM jincheng sun 
> wrote:
> >>> >
> >>> > I found that there are several dependencies shaded and planned to
> made as vendored artifacts in [1]. I'm not sure why Avro is not shaded
> before. From my point of view, it's a good idea to shade Avro and make it a
> vendored artifact if there are no special reasons blocking us to do that.
> Regarding to how to create a vendored artifact, you can refer to [2] for
> more details.
> >>> >
> >>> > Best,
> >>> > Jincheng
> >>> >
> >>> > [1] https://issues.apache.org/jira/browse/BEAM-5819
> >>> > [2] https://github.com/apache/beam/blob/master/vendor/README.md
> >>> >
> >>> >
> >>> > Tomo Suzuki  于2020年1月16日周四 下午1:18写道:
> >>> >>
> >>> >> I've been upgrading dependencies around gRPC. This Avro-problem is
> >>> >> interesting to me.
> >>> >> I'll study BEAM-8388 more tomorrow.
> >>> >>
> >>> >> On Wed, Jan 15, 2020 at 10:51 PM Luke Cwik 
> wrote:
> >>> >> >
> >>> >> > +Tomo Suzuki +jincheng sun
> >>> >> > There have been a few contributors upgrading the dependencies and
> validating things not breaking by running the majority of the post commit
> integration tests and also using the linkage checker to show that we aren't
> worse off with respect to our dependency tree. Reaching out to them to help
> your is your best bet of getting these upgrades through.
> >>> >> >
> >>> >> > On Wed, Jan 15, 2020 at 6:52 PM Aaron Dixon 
> wrote:
>

Re: Beam's Avro 1.8.x dependency

2020-01-15 Thread Aaron Dixon
I meant to mention that we must use Avro 1.9.x as we rely on some schema
resolution fixes not present in 1.8.x - so am indeed blocked.

On Wed, Jan 15, 2020 at 8:50 PM Aaron Dixon  wrote:

> It looks like Avro version dependency from Beam has come up in the past
> [1, 2].
>
> I'm currently on Beam 2.16.0, which has been compatible with my usage of
> Avro 1.9.x.
>
> But upgrading to Beam 2.17.0 is not possible for us now that 2.17.0 has
> some dependencies on Avro classes only available in 1.8.x.
>
> Wondering if anyone else is similar blocked and what it would take to
> prioritize Beam upgrading to 1.9.x or better using a shaded version so that
> clients can use their own Avro version for their own coding purposes. (Eg,
> I parse Avro messages from a KafkaIO source and need 1.9.x for this but am
> perfectly happy if Beam's Avro coding facilities used a shaded other
> version.)
>
> I've made a comment on BEAM-8388 [1] to this effect. But polling community
> for discussion.
>
> [1] https://issues.apache.org/jira/browse/BEAM-8388
> [2] https://github.com/apache/beam/pull/9779
>
>


Beam's Avro 1.8.x dependency

2020-01-15 Thread Aaron Dixon
It looks like Avro version dependency from Beam has come up in the past [1,
2].

I'm currently on Beam 2.16.0, which has been compatible with my usage of
Avro 1.9.x.

But upgrading to Beam 2.17.0 is not possible for us now that 2.17.0 has
some dependencies on Avro classes only available in 1.8.x.

Wondering if anyone else is similar blocked and what it would take to
prioritize Beam upgrading to 1.9.x or better using a shaded version so that
clients can use their own Avro version for their own coding purposes. (Eg,
I parse Avro messages from a KafkaIO source and need 1.9.x for this but am
perfectly happy if Beam's Avro coding facilities used a shaded other
version.)

I've made a comment on BEAM-8388 [1] to this effect. But polling community
for discussion.

[1] https://issues.apache.org/jira/browse/BEAM-8388
[2] https://github.com/apache/beam/pull/9779


Re: No AfterWatermark firings in Dataflow

2020-01-13 Thread Aaron Dixon
Kenn, thank you! There is OnTimeBehavior (default FIRE_ALWAYS) and
ClosingBehavior (default FIRE_IF_NON_EMPTY). Given that OnTimeBehavior is
always-fire, shouldn't I see empty ON_TIME panes?

Since my lateness config is 0, I'm going to try ClosingBehavior =
FIRE_ALWAYS and see if I can rely on .isLast() to pick out the last pane
downstream. But curious if given that the OnTimeBehavior default is ALWAYS,
shouldn't I be seeing on-time panes in my current config?



On Mon, Jan 13, 2020 at 6:45 PM Kenneth Knowles  wrote:

> On my phone, so I can't grab the jira so easily, but quickly: EARLY panes
> are "race condition equivalent" to ON_TIME panes. The early panes consume
> all the pending elements then the on time pane is "empty". This is WAI if
> it is what is causing it. You need to explicitly set
> Window.configure().fireAlways()*. I know this is counterintuitive in
> accumulating mode, where the empty pane is not the identity element.
>
> Kenn
>
> *I don't recall if this is the default or not, and also because on phone
> it is slow to look up. From your experience I think not default.
>
> On Mon, Jan 13, 2020, 15:03 Aaron Dixon  wrote:
>
>> Any confirmation on this from anyone? Whether per Beam spec, runners are
>> obligated to send ON_TIME panes for AfterWatermark triggers? I'm stuck
>> because this seems fundamental, so it's hard to imagine this is a Dataflow
>> bug, but OTOH it's also hard to imagine that trigger specs like
>> AfterWatermark are "optional"... ?
>>
>> On Mon, Jan 13, 2020 at 4:18 PM Aaron Dixon  wrote:
>>
>>> Yes. Using calendar day-based windows and watermark is completely caught
>>> up to today ... calendar window ends several days ago. I got EARLY panes
>>> for each element but never ON_TIME pane.
>>>
>>> On Mon, Jan 13, 2020 at 4:16 PM Luke Cwik  wrote:
>>>
>>>> Is the watermark advancing past the end of the window?
>>>>
>>>> On Mon, Jan 13, 2020 at 2:02 PM Aaron Dixon  wrote:
>>>>
>>>>> The window is not empty fwiw; it has elements; I get an early firing
>>>>> pane for the window but well after the watermark passes there is no 
>>>>> ON_TIME
>>>>> pane. Would this be a bug in Dataflow? Seems fundamental, so I'm concerned
>>>>> perhaps the Beam spec doesn't obligate ON_TIME firings?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Jan 13, 2020 at 3:58 PM Luke Cwik  wrote:
>>>>>
>>>>>> I would have expected an empty on time pane since the default on time
>>>>>> behavior is FIRE_ALWAYS.
>>>>>>
>>>>>> On Mon, Jan 13, 2020 at 1:54 PM Aaron Dixon 
>>>>>> wrote:
>>>>>>
>>>>>>> Can anyone confirm?
>>>>>>>
>>>>>>> This is intermittent. Some (it seems, sparse) windows don't get an
>>>>>>> ON_TIME firing after watermark. Is this a bug or is there a reason to 
>>>>>>> not
>>>>>>> expect ON_TIME firings for every window?
>>>>>>>
>>>>>>> On Mon, Jan 13, 2020 at 3:47 PM Rui Wang  wrote:
>>>>>>>
>>>>>>>> If it indeed happened as you have described, I will be very
>>>>>>>> interested in the expected behaviour.
>>>>>>>>
>>>>>>>> Something I remembered before: the trigger condition meets just
>>>>>>>> gives the runner/engine "permission" to fire, but runner/engine may not
>>>>>>>> fire immediately. But I don't know if the engine/runner will guarantee 
>>>>>>>> to
>>>>>>>> fire.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> -Rui
>>>>>>>>
>>>>>>>> On Mon, Jan 13, 2020 at 1:43 PM Aaron Dixon 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I have the following trigger:
>>>>>>>>>
>>>>>>>>> .apply(Window
>>>>>>>>>   .configure()
>>>>>>>>>   .triggering(AfterWatermark
>>>>>>>>>.pastEndOfWindow()
>>>>>>>>>.withEarlyFirings(AfterPane
>>>>>>>>> .elementCountAtLeast(1)))
>>>>>>>>>   .accumulatingFiredPanes()
>>>>>>>>>   .withAllowedLateness(Duration.ZERO)
>>>>>>>>>
>>>>>>>>> But in Dataflow I notice that I never get an ON_TIME firing for my
>>>>>>>>> window -- I only see early firing for elements, and then nothing.
>>>>>>>>>
>>>>>>>>> My assumption is that AfterWatermark should give me a last,
>>>>>>>>> on-time pane under this configuration when the watermark surpasses the
>>>>>>>>> window's end.
>>>>>>>>>
>>>>>>>>> Is my expectation correct?
>>>>>>>>>
>>>>>>>>


Re: No AfterWatermark firings in Dataflow

2020-01-13 Thread Aaron Dixon
Any confirmation on this from anyone? Whether per Beam spec, runners are
obligated to send ON_TIME panes for AfterWatermark triggers? I'm stuck
because this seems fundamental, so it's hard to imagine this is a Dataflow
bug, but OTOH it's also hard to imagine that trigger specs like
AfterWatermark are "optional"... ?

On Mon, Jan 13, 2020 at 4:18 PM Aaron Dixon  wrote:

> Yes. Using calendar day-based windows and watermark is completely caught
> up to today ... calendar window ends several days ago. I got EARLY panes
> for each element but never ON_TIME pane.
>
> On Mon, Jan 13, 2020 at 4:16 PM Luke Cwik  wrote:
>
>> Is the watermark advancing past the end of the window?
>>
>> On Mon, Jan 13, 2020 at 2:02 PM Aaron Dixon  wrote:
>>
>>> The window is not empty fwiw; it has elements; I get an early firing
>>> pane for the window but well after the watermark passes there is no ON_TIME
>>> pane. Would this be a bug in Dataflow? Seems fundamental, so I'm concerned
>>> perhaps the Beam spec doesn't obligate ON_TIME firings?
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Jan 13, 2020 at 3:58 PM Luke Cwik  wrote:
>>>
>>>> I would have expected an empty on time pane since the default on time
>>>> behavior is FIRE_ALWAYS.
>>>>
>>>> On Mon, Jan 13, 2020 at 1:54 PM Aaron Dixon  wrote:
>>>>
>>>>> Can anyone confirm?
>>>>>
>>>>> This is intermittent. Some (it seems, sparse) windows don't get an
>>>>> ON_TIME firing after watermark. Is this a bug or is there a reason to not
>>>>> expect ON_TIME firings for every window?
>>>>>
>>>>> On Mon, Jan 13, 2020 at 3:47 PM Rui Wang  wrote:
>>>>>
>>>>>> If it indeed happened as you have described, I will be very
>>>>>> interested in the expected behaviour.
>>>>>>
>>>>>> Something I remembered before: the trigger condition meets just gives
>>>>>> the runner/engine "permission" to fire, but runner/engine may not fire
>>>>>> immediately. But I don't know if the engine/runner will guarantee to 
>>>>>> fire.
>>>>>>
>>>>>>
>>>>>>
>>>>>> -Rui
>>>>>>
>>>>>> On Mon, Jan 13, 2020 at 1:43 PM Aaron Dixon 
>>>>>> wrote:
>>>>>>
>>>>>>> I have the following trigger:
>>>>>>>
>>>>>>> .apply(Window
>>>>>>>   .configure()
>>>>>>>   .triggering(AfterWatermark
>>>>>>>.pastEndOfWindow()
>>>>>>>.withEarlyFirings(AfterPane
>>>>>>> .elementCountAtLeast(1)))
>>>>>>>   .accumulatingFiredPanes()
>>>>>>>   .withAllowedLateness(Duration.ZERO)
>>>>>>>
>>>>>>> But in Dataflow I notice that I never get an ON_TIME firing for my
>>>>>>> window -- I only see early firing for elements, and then nothing.
>>>>>>>
>>>>>>> My assumption is that AfterWatermark should give me a last, on-time
>>>>>>> pane under this configuration when the watermark surpasses the window's 
>>>>>>> end.
>>>>>>>
>>>>>>> Is my expectation correct?
>>>>>>>
>>>>>>


Re: No AfterWatermark firings in Dataflow

2020-01-13 Thread Aaron Dixon
Yes. Using calendar day-based windows and watermark is completely caught up
to today ... calendar window ends several days ago. I got EARLY panes for
each element but never ON_TIME pane.

On Mon, Jan 13, 2020 at 4:16 PM Luke Cwik  wrote:

> Is the watermark advancing past the end of the window?
>
> On Mon, Jan 13, 2020 at 2:02 PM Aaron Dixon  wrote:
>
>> The window is not empty fwiw; it has elements; I get an early firing pane
>> for the window but well after the watermark passes there is no ON_TIME
>> pane. Would this be a bug in Dataflow? Seems fundamental, so I'm concerned
>> perhaps the Beam spec doesn't obligate ON_TIME firings?
>>
>>
>>
>>
>>
>> On Mon, Jan 13, 2020 at 3:58 PM Luke Cwik  wrote:
>>
>>> I would have expected an empty on time pane since the default on time
>>> behavior is FIRE_ALWAYS.
>>>
>>> On Mon, Jan 13, 2020 at 1:54 PM Aaron Dixon  wrote:
>>>
>>>> Can anyone confirm?
>>>>
>>>> This is intermittent. Some (it seems, sparse) windows don't get an
>>>> ON_TIME firing after watermark. Is this a bug or is there a reason to not
>>>> expect ON_TIME firings for every window?
>>>>
>>>> On Mon, Jan 13, 2020 at 3:47 PM Rui Wang  wrote:
>>>>
>>>>> If it indeed happened as you have described, I will be very interested
>>>>> in the expected behaviour.
>>>>>
>>>>> Something I remembered before: the trigger condition meets just gives
>>>>> the runner/engine "permission" to fire, but runner/engine may not fire
>>>>> immediately. But I don't know if the engine/runner will guarantee to fire.
>>>>>
>>>>>
>>>>>
>>>>> -Rui
>>>>>
>>>>> On Mon, Jan 13, 2020 at 1:43 PM Aaron Dixon  wrote:
>>>>>
>>>>>> I have the following trigger:
>>>>>>
>>>>>> .apply(Window
>>>>>>   .configure()
>>>>>>   .triggering(AfterWatermark
>>>>>>.pastEndOfWindow()
>>>>>>.withEarlyFirings(AfterPane
>>>>>> .elementCountAtLeast(1)))
>>>>>>   .accumulatingFiredPanes()
>>>>>>   .withAllowedLateness(Duration.ZERO)
>>>>>>
>>>>>> But in Dataflow I notice that I never get an ON_TIME firing for my
>>>>>> window -- I only see early firing for elements, and then nothing.
>>>>>>
>>>>>> My assumption is that AfterWatermark should give me a last, on-time
>>>>>> pane under this configuration when the watermark surpasses the window's 
>>>>>> end.
>>>>>>
>>>>>> Is my expectation correct?
>>>>>>
>>>>>


Re: No AfterWatermark firings in Dataflow

2020-01-13 Thread Aaron Dixon
The window is not empty fwiw; it has elements; I get an early firing pane
for the window but well after the watermark passes there is no ON_TIME
pane. Would this be a bug in Dataflow? Seems fundamental, so I'm concerned
perhaps the Beam spec doesn't obligate ON_TIME firings?





On Mon, Jan 13, 2020 at 3:58 PM Luke Cwik  wrote:

> I would have expected an empty on time pane since the default on time
> behavior is FIRE_ALWAYS.
>
> On Mon, Jan 13, 2020 at 1:54 PM Aaron Dixon  wrote:
>
>> Can anyone confirm?
>>
>> This is intermittent. Some (it seems, sparse) windows don't get an
>> ON_TIME firing after watermark. Is this a bug or is there a reason to not
>> expect ON_TIME firings for every window?
>>
>> On Mon, Jan 13, 2020 at 3:47 PM Rui Wang  wrote:
>>
>>> If it indeed happened as you have described, I will be very interested
>>> in the expected behaviour.
>>>
>>> Something I remembered before: the trigger condition meets just gives
>>> the runner/engine "permission" to fire, but runner/engine may not fire
>>> immediately. But I don't know if the engine/runner will guarantee to fire.
>>>
>>>
>>>
>>> -Rui
>>>
>>> On Mon, Jan 13, 2020 at 1:43 PM Aaron Dixon  wrote:
>>>
>>>> I have the following trigger:
>>>>
>>>> .apply(Window
>>>>   .configure()
>>>>   .triggering(AfterWatermark
>>>>.pastEndOfWindow()
>>>>.withEarlyFirings(AfterPane
>>>> .elementCountAtLeast(1)))
>>>>   .accumulatingFiredPanes()
>>>>   .withAllowedLateness(Duration.ZERO)
>>>>
>>>> But in Dataflow I notice that I never get an ON_TIME firing for my
>>>> window -- I only see early firing for elements, and then nothing.
>>>>
>>>> My assumption is that AfterWatermark should give me a last, on-time
>>>> pane under this configuration when the watermark surpasses the window's 
>>>> end.
>>>>
>>>> Is my expectation correct?
>>>>
>>>


Re: No AfterWatermark firings in Dataflow

2020-01-13 Thread Aaron Dixon
Can anyone confirm?

This is intermittent. Some (it seems, sparse) windows don't get an ON_TIME
firing after watermark. Is this a bug or is there a reason to not expect
ON_TIME firings for every window?

On Mon, Jan 13, 2020 at 3:47 PM Rui Wang  wrote:

> If it indeed happened as you have described, I will be very interested in
> the expected behaviour.
>
> Something I remembered before: the trigger condition meets just gives the
> runner/engine "permission" to fire, but runner/engine may not fire
> immediately. But I don't know if the engine/runner will guarantee to fire.
>
>
>
> -Rui
>
> On Mon, Jan 13, 2020 at 1:43 PM Aaron Dixon  wrote:
>
>> I have the following trigger:
>>
>> .apply(Window
>>   .configure()
>>   .triggering(AfterWatermark
>>.pastEndOfWindow()
>>.withEarlyFirings(AfterPane
>> .elementCountAtLeast(1)))
>>   .accumulatingFiredPanes()
>>   .withAllowedLateness(Duration.ZERO)
>>
>> But in Dataflow I notice that I never get an ON_TIME firing for my window
>> -- I only see early firing for elements, and then nothing.
>>
>> My assumption is that AfterWatermark should give me a last, on-time pane
>> under this configuration when the watermark surpasses the window's end.
>>
>> Is my expectation correct?
>>
>


No AfterWatermark firings in Dataflow

2020-01-13 Thread Aaron Dixon
I have the following trigger:

.apply(Window
  .configure()
  .triggering(AfterWatermark
   .pastEndOfWindow()
   .withEarlyFirings(AfterPane
.elementCountAtLeast(1)))
  .accumulatingFiredPanes()
  .withAllowedLateness(Duration.ZERO)

But in Dataflow I notice that I never get an ON_TIME firing for my window
-- I only see early firing for elements, and then nothing.

My assumption is that AfterWatermark should give me a last, on-time pane
under this configuration when the watermark surpasses the window's end.

Is my expectation correct?


Re: outputWithTimestamp

2020-01-13 Thread Aaron Dixon
Reuven, thank you much for your help and the clarity here, it's very
helpful..

Per your solution #2 -- This approach makes sense, seems semantically
right, and something I'll explore when the timer.withOutputTimetstamp(t)
releases. Just for clarity, there is no other way in Beam
(mid-pipeline/post-Source) for me to affect a hold the watermark today
until this API is released, correct?

On Mon, Jan 13, 2020 at 1:22 AM Reuven Lax  wrote:

> Semantically though, since you want the CalendarWindow aggregation to be
> based on login timestamps, the watermark should be tracking the login
> timestamps. The watermark is a way for the CalendarWindow to know that as
> far as the system knows, there will be no more events that fall into that
> window. You say that long sessions are holding back the watermark, but
> that's exactly because those long sessions mean that there is still data
> pending for that CalendarWindow, so it is still incomplete! The above
> techniques might appear to solve this, but do so at the expense of somewhat
> randomly causing data to be late or worse dropped.
>
> There are a couple of ways I would address this:
>
> 1. The simplest would be to allow the watermark to track the login window,
> but put a trigger on the CalendarWindow (e.g. trigger every 10 seconds).
> That way whenever the trigger fires you can update the results so far for
> that window. This means that the majority of session that are complete can
> be output without needing to wait for the long sessions, yet the window
> will remain open waiting for those long sessions to complete.
>
> 2. Another possibility is to explicitly identify those extra-long
> sessions, and handle them differently. This I think is a better solution
> than the above timestampSkew solution, because it's deterministic: you know
> exactly which sessions you are handling differently. I would do this by
> using the state+timers API to calculate the sessions, instead of the
> sessions WindowFn. When a session is overly long, then you can stop setting
> the watermark hold for the login time, essentially removing that long
> session from the watermark calculation.
>
> One possibility for how to handle the long sessions "differently" would
> still involve using withAllowedTimestampSkew. This still risks losing some
> of these (if the skew ever happens to be larger than the static value you
> set, you'll not be about to output the session). However now you know
> you're limiting the skewed output to only those specific long sessions
> you've chosen, which is much better than emitting all records with skew and
> hoping that things work out.
>
> Reuven
>
> On Sun, Jan 12, 2020 at 12:07 PM Aaron Dixon  wrote:
>
>> Reuven thanks -- I understand each point although I'm trying to grapple
>> with your concerns expressed in #3; they don't seem avoidable even w/o the
>> allowedSkew feature.
>>
>> Considering your response I see a revision to my solution that omits
>> using the allowed skew configuration but as far as I can tell still has the
>> concerns from #3 (i.e., difficulty in reasoning about which events may be
>> dropped.)
>>
>> My pipeline using the skew config looks like this:
>>
>> (1) CustomSessionWindow
>> emits -> (user, login, logout) @ 
>> (2) ParDo
>> -> re-emits same tuple but w/ *login* timestamp
>> (requires custom allowed-skew)
>> (3) CalendarWindow
>> -> > timestamp>
>>
>> Instead, I can write a CustomCalendarWindow that places the tuple element
>> in the right window based on the *login* timestamp, avoiding the need for
>> the middle/skewing ParDo:
>>
>> (1) CustomSessionWindow
>> -> (user, login, logout) @ 
>> (2) CustomCalendarWindow
>> -> <*explicitly* places element in window based on the **login**
>> timestamp>
>>
>> So the use of the ParDo was simply a way to avoid having to write a
>> custom window; it essentially ensures the CalendarWindow windows based on
>> login time.
>>
>> But I don't see how your concerns in #3 are obviated by this revision.
>> Elements going in to the calendar window may be already late...this is
>> something that any (multi-stage) Beam pipeline has to contend with, even
>> without the deprecated allowedSkew facility, no?
>>
>> In other words both of these pipelines are semantically, behaviorally
>> identical. The former just had the benefit of not requiring a custom window
>> implementation.
>>
>>
>>
>>
>>
>>
>> On Sun, Jan 12, 2020 at 12:12 PM Reuven Lax  wrote:
>>
>>> A few comments:
>>>
>>> 1. Yes, this already works on Dataflow (

Re: outputWithTimestamp

2020-01-12 Thread Aaron Dixon
Reuven thanks -- I understand each point although I'm trying to grapple
with your concerns expressed in #3; they don't seem avoidable even w/o the
allowedSkew feature.

Considering your response I see a revision to my solution that omits using
the allowed skew configuration but as far as I can tell still has the
concerns from #3 (i.e., difficulty in reasoning about which events may be
dropped.)

My pipeline using the skew config looks like this:

(1) CustomSessionWindow
emits -> (user, login, logout) @ 
(2) ParDo
-> re-emits same tuple but w/ *login* timestamp
(requires custom allowed-skew)
(3) CalendarWindow
-> 

Instead, I can write a CustomCalendarWindow that places the tuple element
in the right window based on the *login* timestamp, avoiding the need for
the middle/skewing ParDo:

(1) CustomSessionWindow
-> (user, login, logout) @ 
(2) CustomCalendarWindow
-> <*explicitly* places element in window based on the **login** timestamp>

So the use of the ParDo was simply a way to avoid having to write a custom
window; it essentially ensures the CalendarWindow windows based on login
time.

But I don't see how your concerns in #3 are obviated by this revision.
Elements going in to the calendar window may be already late...this is
something that any (multi-stage) Beam pipeline has to contend with, even
without the deprecated allowedSkew facility, no?

In other words both of these pipelines are semantically, behaviorally
identical. The former just had the benefit of not requiring a custom window
implementation.






On Sun, Jan 12, 2020 at 12:12 PM Reuven Lax  wrote:

> A few comments:
>
> 1. Yes, this already works on Dataflow (at Beam head). Flink support is
> pending at pr/10534.
>
> 2. Just to make sure where on the same page: getAllowedTimestampSkew is
> _not_ about outputting behind the watermark. Rather it's about outputting a
> timestamp that's less than the current input timestamp. If for example the
> watermark is 12:00 and the current input element has a timestamp of 11:00
> (because it's late), then  you can output an element at 11:00 with no need
> to set this parameter. It appears that the JavaDoc is somewhat confusing on
> this method.
>
> 3. The reason for this parameter is that the watermark only correctly
> tracks timestamps internal to the pipeline if your code doesn't make
> timestamps travel back in time - i.e. a ParDo taking an element with a
> timestamp of 12:00 and outputting another element. If you use
> getAllowedTimestampSkew your elements produced might not be tracked by the
> watermark and will show up late (even if the source element is on time).
> What's worse, there's a chance that the elements will be older than
> allowedLateness and will get dropped altogether (this can happen even if
> allowedTimestampSkew < maxAllowedLateness, because the input element might
> already be late and you'll then output an element that has an even earlier
> timestamp).
>
> 4. It sounds like you both want and don't want a watermark. You want the
> watermark to not be held up by your input (so that your aggregations keep
> triggering), but you then want to output old data which might prevent the
> watermark from working properly, and might cause data to be dropped. Have
> you considered instead using either triggers or timers to trigger your
> aggregations? That way you don't need to wait for the watermark to advance
> to the end of the window to trigger the aggregation, but the end-of-window
> aggregation will still be correct.
>
> Reuven
>
> On Sun, Jan 12, 2020 at 8:23 AM Aaron Dixon  wrote:
>
>> Reuven thanks for your insights so far. Just wanted to press a little
>> more on the deprecation question as I'm still (so far) convinced that my
>> use case is quite a straightforward justification (I'm looking for
>> confirmation or correction to my thinking here.) I've simplified my use
>> case a bit if it helps things:
>>
>> Use case: "For users that login on a given calendar day, what is the
>> average login time?"
>>
>> So I have two event types LOGIN and LOGOUT. I capture a user login
>> session (using custom windowing or state api, doesn't matter) and I use the
>> default TimestampCombiner/END_OF_WINDOW because I want my aggregations to
>> not be delayed.
>>
>> However per my use case requirements I must window using the LOGIN time.
>> So I use outputWithTimestamp plus skew configuration to this end.
>>
>> Since most of my users login and logout within the same calendar day, I
>> get may per-day aggregations right on time in real-time.
>>
>> Only for the few users that logout after the day that they login will I
>> see actual late aggregations produced in which case I can leverage Beam's
>> various late

Re: outputWithTimestamp

2020-01-12 Thread Aaron Dixon
Reuven thanks for your insights so far. Just wanted to press a little more
on the deprecation question as I'm still (so far) convinced that my use
case is quite a straightforward justification (I'm looking for confirmation
or correction to my thinking here.) I've simplified my use case a bit if it
helps things:

Use case: "For users that login on a given calendar day, what is the
average login time?"

So I have two event types LOGIN and LOGOUT. I capture a user login session
(using custom windowing or state api, doesn't matter) and I use the default
TimestampCombiner/END_OF_WINDOW because I want my aggregations to not be
delayed.

However per my use case requirements I must window using the LOGIN time. So
I use outputWithTimestamp plus skew configuration to this end.

Since most of my users login and logout within the same calendar day, I get
may per-day aggregations right on time in real-time.

Only for the few users that logout after the day that they login will I see
actual late aggregations produced in which case I can leverage Beam's
various lateness configuration levers to trade completeness for storage,
etc.

This to me seems a *very* straightforward justification for my use of
DoFn#getAllowedTimestampSkew. Should this justify not deprecating that
facility.

I realize there are other various solutions, now and coming soon, that
involve holding the watermark -- but any solution that requires holding the
watermark means that I have to give up getting on-time aggregations at the
very end of the calendar day (window). I would much rather (and reasonably
so?) get on-time aggregations covering the majority of my users and be
happy to refine these averages when my few latent users logout in a later
day.

In some Beam documentation [1] there is the idea of "unobservably late
data". That is, I have specific elements that are output late (behind the
watermark) but because they are guaranteed to land *within the window* and
they are therefore promoted to be on-time. This conceptualization of things
seems very well-suited to my simple use case but definitely open to a
different way of thinking in my approach.

My main concern is that my pipeline will be leveraging a Deprecated
facility (DoFn#getAllowedTimestampSkew) but I don't see other viable
options (within Beam) yet.

(Hope I'm not pressing too hard on this question here. I think this use
case is interesting because it ...seems... to be a rather simple/distilled
justification for being able to output data behind the watermark
mid-stream.)

[1] https://beam.apache.org/blog/2016/10/20/test-stream.html


On Sat, Jan 11, 2020 at 10:10 PM Aaron Dixon  wrote:

> Oh nice—that will be great—will look forward to this one! Any idea of
> Dataflow will support?
>
> On Sat, Jan 11, 2020 at 9:07 PM Reuven Lax  wrote:
>
>> There is now (as of last week) a way to hold back the watermark with the
>> state API (though not yet in a released version of Beam). If you set a
>> timer using withOutputTimetstamp(t), the watermark will be held to t.
>>
>> On Sat, Jan 11, 2020 at 4:15 PM Aaron Dixon  wrote:
>>
>>> Hi Reuven thanks for your quick reply
>>>
>>>  I've tried that but the drag it puts on the watermark was too
>>> intrusive. For example, -- even if just a single user among many decided to
>>> remain logged-in for a few days then the watermark holds everything else
>>> back.
>>>
>>> This was when using a custom session window. I've recently been using
>>> the State API to do my custom session tracking to avoid issues with
>>> downward merging of windows (see earlier mailing list thread) ... with the
>>> State API .. I'm not able to hold the watermark back (I think) ... but in
>>> any case, I prefer the behavior where the watermark moves forward with the
>>> upstream events and to deal with the very few straggler users by a lateness
>>> configuration.
>>>
>>> Does that make sense? So far to me this seems very reasonable (to want
>>> to keep the watermark moving and deal w/ the late events the few of which
>>> actually fall out of the window using explicit lateness configuration.)
>>>
>>> On Sat, Jan 11, 2020 at 4:57 PM Reuven Lax  wrote:
>>>
>>>> Have you looked at using
>>>> withTimestampCombiner(TimestampCombiner.EARLIEST)? This will hold the
>>>> downstream watermark back to the beginning of the window (presumably the
>>>> timestamp of the LOGIN event), so you can .call outputWithTimestamp using
>>>> the CLICK GREEN timestamp without needing to set the allowed-lateness skew.
>>>>
>>>> Reuven
>>>>
>>>> On Sat, Jan 11, 2020 at 1:50 PM Aaron Dixon  wrote:
>>>>
>>>>> I've just built a

Re: outputWithTimestamp

2020-01-11 Thread Aaron Dixon
Oh nice—that will be great—will look forward to this one! Any idea of
Dataflow will support?

On Sat, Jan 11, 2020 at 9:07 PM Reuven Lax  wrote:

> There is now (as of last week) a way to hold back the watermark with the
> state API (though not yet in a released version of Beam). If you set a
> timer using withOutputTimetstamp(t), the watermark will be held to t.
>
> On Sat, Jan 11, 2020 at 4:15 PM Aaron Dixon  wrote:
>
>> Hi Reuven thanks for your quick reply
>>
>>  I've tried that but the drag it puts on the watermark was too intrusive.
>> For example, -- even if just a single user among many decided to remain
>> logged-in for a few days then the watermark holds everything else back.
>>
>> This was when using a custom session window. I've recently been using the
>> State API to do my custom session tracking to avoid issues with downward
>> merging of windows (see earlier mailing list thread) ... with the State API
>> .. I'm not able to hold the watermark back (I think) ... but in any case, I
>> prefer the behavior where the watermark moves forward with the upstream
>> events and to deal with the very few straggler users by a lateness
>> configuration.
>>
>> Does that make sense? So far to me this seems very reasonable (to want to
>> keep the watermark moving and deal w/ the late events the few of which
>> actually fall out of the window using explicit lateness configuration.)
>>
>> On Sat, Jan 11, 2020 at 4:57 PM Reuven Lax  wrote:
>>
>>> Have you looked at using
>>> withTimestampCombiner(TimestampCombiner.EARLIEST)? This will hold the
>>> downstream watermark back to the beginning of the window (presumably the
>>> timestamp of the LOGIN event), so you can .call outputWithTimestamp using
>>> the CLICK GREEN timestamp without needing to set the allowed-lateness skew.
>>>
>>> Reuven
>>>
>>> On Sat, Jan 11, 2020 at 1:50 PM Aaron Dixon  wrote:
>>>
>>>> I've just built a pipeline in Beam and after exploring several options
>>>> for my use case, I've ended up relying on the deprecated
>>>> .outputWithTimestamp() + DoFn#getAllowedTimestampSkew in what seems to me a
>>>> quite valid use case. So I suppose this is a vote for un-deprecating this
>>>> API (or a teachable moment in which I could be pointed to a more suitable
>>>> non-deprecated approach.)
>>>>
>>>> I'll stick with a previously simplification of my use case:
>>>>
>>>> I get these events from my users:
>>>> LOGIN
>>>> CLICK GREEN BUTTON
>>>> LOGOUT
>>>>
>>>> I capture user session duration (logout time *minus* login time) and I
>>>> want to perform a PER DAY average (i.e., my window is on CalendarDays) BUT
>>>> where the aggregation's timestamp is the time of the CLICK GREEN event.
>>>>
>>>> So once I calculate and emit a single user's session duration I need to
>>>> .outputWithTimestamp using the CLICK GREEN event's timestamp. This
>>>> involves, of course, outputting with a timestamp *before* the watermark.
>>>>
>>>> In most cases my users LOGOUT in the same day as the CLICK GREEN BUTTON
>>>> event, so even though I'm typically outputting a timestamp before the
>>>> watermark the CalendarDay window is not yet closed and so most user session
>>>> duration's do not affect a late aggregation for that CalendarDay.
>>>>
>>>> Only when a LOGOUT occurs on a day later than the CLICK GREEN event do
>>>> I have to contend with potentially late data contributing back to a prior
>>>> CalendarDay.
>>>>
>>>> In any case, I have .withAllowedLateness to allow me to make a call
>>>> here about what I'm willing tradeoff (keeping windows open vs. dropping
>>>> data for users with overly long sessions), etc.
>>>>
>>>> This here seems to be a simple scenario (it is effectively my
>>>> real-world scenario) and the
>>>> .outputWithTimestamp + DoFn#getAllowedTimestampSkew seem to cover it in a
>>>> straightforward, effective way.
>>>>
>>>> However of course I don't like building production code on deprecated
>>>> capabilities -- so advice on alternatives (or perhaps a reconsideration of
>>>> this deprecation :) ) would be appreciated.
>>>>
>>>>


Re: outputWithTimestamp

2020-01-11 Thread Aaron Dixon
Hi Reuven thanks for your quick reply

 I've tried that but the drag it puts on the watermark was too intrusive.
For example, -- even if just a single user among many decided to remain
logged-in for a few days then the watermark holds everything else back.

This was when using a custom session window. I've recently been using the
State API to do my custom session tracking to avoid issues with downward
merging of windows (see earlier mailing list thread) ... with the State API
.. I'm not able to hold the watermark back (I think) ... but in any case, I
prefer the behavior where the watermark moves forward with the upstream
events and to deal with the very few straggler users by a lateness
configuration.

Does that make sense? So far to me this seems very reasonable (to want to
keep the watermark moving and deal w/ the late events the few of which
actually fall out of the window using explicit lateness configuration.)

On Sat, Jan 11, 2020 at 4:57 PM Reuven Lax  wrote:

> Have you looked at using
> withTimestampCombiner(TimestampCombiner.EARLIEST)? This will hold the
> downstream watermark back to the beginning of the window (presumably the
> timestamp of the LOGIN event), so you can .call outputWithTimestamp using
> the CLICK GREEN timestamp without needing to set the allowed-lateness skew.
>
> Reuven
>
> On Sat, Jan 11, 2020 at 1:50 PM Aaron Dixon  wrote:
>
>> I've just built a pipeline in Beam and after exploring several options
>> for my use case, I've ended up relying on the deprecated
>> .outputWithTimestamp() + DoFn#getAllowedTimestampSkew in what seems to me a
>> quite valid use case. So I suppose this is a vote for un-deprecating this
>> API (or a teachable moment in which I could be pointed to a more suitable
>> non-deprecated approach.)
>>
>> I'll stick with a previously simplification of my use case:
>>
>> I get these events from my users:
>> LOGIN
>> CLICK GREEN BUTTON
>> LOGOUT
>>
>> I capture user session duration (logout time *minus* login time) and I
>> want to perform a PER DAY average (i.e., my window is on CalendarDays) BUT
>> where the aggregation's timestamp is the time of the CLICK GREEN event.
>>
>> So once I calculate and emit a single user's session duration I need to
>> .outputWithTimestamp using the CLICK GREEN event's timestamp. This
>> involves, of course, outputting with a timestamp *before* the watermark.
>>
>> In most cases my users LOGOUT in the same day as the CLICK GREEN BUTTON
>> event, so even though I'm typically outputting a timestamp before the
>> watermark the CalendarDay window is not yet closed and so most user session
>> duration's do not affect a late aggregation for that CalendarDay.
>>
>> Only when a LOGOUT occurs on a day later than the CLICK GREEN event do I
>> have to contend with potentially late data contributing back to a prior
>> CalendarDay.
>>
>> In any case, I have .withAllowedLateness to allow me to make a call here
>> about what I'm willing tradeoff (keeping windows open vs. dropping data for
>> users with overly long sessions), etc.
>>
>> This here seems to be a simple scenario (it is effectively my real-world
>> scenario) and the .outputWithTimestamp + DoFn#getAllowedTimestampSkew seem
>> to cover it in a straightforward, effective way.
>>
>> However of course I don't like building production code on deprecated
>> capabilities -- so advice on alternatives (or perhaps a reconsideration of
>> this deprecation :) ) would be appreciated.
>>
>>


outputWithTimestamp

2020-01-11 Thread Aaron Dixon
I've just built a pipeline in Beam and after exploring several options for
my use case, I've ended up relying on the deprecated
.outputWithTimestamp() + DoFn#getAllowedTimestampSkew in what seems to me a
quite valid use case. So I suppose this is a vote for un-deprecating this
API (or a teachable moment in which I could be pointed to a more suitable
non-deprecated approach.)

I'll stick with a previously simplification of my use case:

I get these events from my users:
LOGIN
CLICK GREEN BUTTON
LOGOUT

I capture user session duration (logout time *minus* login time) and I want
to perform a PER DAY average (i.e., my window is on CalendarDays) BUT where
the aggregation's timestamp is the time of the CLICK GREEN event.

So once I calculate and emit a single user's session duration I need to
.outputWithTimestamp using the CLICK GREEN event's timestamp. This
involves, of course, outputting with a timestamp *before* the watermark.

In most cases my users LOGOUT in the same day as the CLICK GREEN BUTTON
event, so even though I'm typically outputting a timestamp before the
watermark the CalendarDay window is not yet closed and so most user session
duration's do not affect a late aggregation for that CalendarDay.

Only when a LOGOUT occurs on a day later than the CLICK GREEN event do I
have to contend with potentially late data contributing back to a prior
CalendarDay.

In any case, I have .withAllowedLateness to allow me to make a call here
about what I'm willing tradeoff (keeping windows open vs. dropping data for
users with overly long sessions), etc.

This here seems to be a simple scenario (it is effectively my real-world
scenario) and the .outputWithTimestamp + DoFn#getAllowedTimestampSkew seem
to cover it in a straightforward, effective way.

However of course I don't like building production code on deprecated
capabilities -- so advice on alternatives (or perhaps a reconsideration of
this deprecation :) ) would be appreciated.


Re: Custom window invariants and

2020-01-11 Thread Aaron Dixon
For future reference (for anyone searching mailing list for this or similar
issue), the ticket Kenneth pointed to (
https://issues.apache.org/jira/browse/BEAM-654) is precisely the use case I
have -- "session windows with a terminal/stop event" For now I've opted to
use the State API (
https://beam.apache.org/blog/2017/02/13/stateful-processing.html) to
achieve my particularly special windowing needs as it yields a
straightforward implementation that I can prove correct (and for my use
cases distributing the windowing logic for a key is not a performance win.)

However I'm very interested in seeing how the windowing merge semantics get
nailed down / evolve, and to explore what kind of innovative stuff could be
ultimately done with them..

On Fri, Jan 10, 2020 at 4:38 PM Aaron Dixon  wrote:

> Once again this is a great help, thank you Kenneth
>
> On Wed, Jan 8, 2020 at 3:03 PM Kenneth Knowles  wrote:
>
>> Hmm. I've seen this manifest in some other tweaked versions of Sessions.
>> Your invariants are right. In fact, the Nexmark queries have auctions that
>> truncate in a similar way. This prompted
>> https://issues.apache.org/jira/browse/BEAM-654.  I think we have not
>> really nailed down the right spec for merging, and we certainly aren't
>> enforcing it. To be robust, your merging should be associative and
>> commutative, which means that you can't have an "end of session" event that
>> contradicts a merge that occurred. OTOH I also know that Tyler has hacked
>> window functions that split... it is mostly unexplored, semantically.
>>
>> About the error, this may help debug: The "state address windows" for a
>> given merged window are all the windows that contribute to it. This means
>> that when windows A and B merge to become a window AB, we can leave the
>> accumulated state stored with A and B and just note that when we read from
>> AB we actually have to read from both A and B*. So suppose windows A and B
>> are about to merge. Before merge, the state address window map is:
>>
>> A -> [A]
>> B -> [B]
>>
>> After merge, there a new window AB and "window to state address window"
>> mapping
>>
>> AB -> [A, B]
>>
>> The error means that there is more than one merged window that will read
>> data from a pre-merged window. So there is a situation like
>>
>> AB -> [A, B]
>> BC -> [B, C]
>>
>> This is not intended to happen. It would be the consequence of B merging
>> into two different new windows. Hence it is an internal error. Most likely
>> a bug or a mismatch based on the assumptions. Note that this code/logic is
>> shared by all runners. I do think you can write a WindowFn that induces it.
>>
>> Kenn
>>
>> *this was intended to be a performance optimization, but eagerly copying
>> the data turned out faster so now it is a legacy compatibility thing that
>> we could remove it I think, but changing this code is tricky
>>
>> On Tue, Jan 7, 2020 at 3:27 PM Aaron Dixon  wrote:
>>
>>> What I'm attempting is a variation on Session windows in which there may
>>> exist a "terminal" element in the stream that immediately stops the session
>>> (or perhaps after some configured delay.)
>>>
>>> My implementation behaves just like Sessions until any such "terminal"
>>> element is encountered in which case I mark the window as "terminal" and
>>> all windows "merge down" such that any terminal windows get to dictate the
>>> Interval.end()/Window.maxTimestamp().
>>>
>>> So, trivial example, if I have windows W1 [0, 100) and W2 [50, 75,
>>> terminal = true] then the merged result will be W3 [0, 75).
>>>
>>> I've been successful doing this so far but I've been inferring some
>>> invariants about windows that I'm not sure are official or documented
>>> anywhere.
>>>
>>> The invariants that I've inferred go like this:
>>>
>>> (I) Definition. An element is "in" window W if it originated in W or in
>>> a window that was merged into W (, recursively.)
>>>
>>> (II) Invariant. Any element, e, in window W MUST have e.timestamp <=
>>> W.maxTimestamp().
>>>
>>> So far, I think this is obvious and true stuff (I hope). (It would
>>> actually be better or great if there was a way for II to not have to hold,
>>> but that is a whole other separate discussion I think.)
>>>
>>> The main invariant I'm trying to formalize is one that allows me to
>>> "merge down" -- i.e., to merge in s

Re: Custom window invariants and

2020-01-10 Thread Aaron Dixon
Once again this is a great help, thank you Kenneth

On Wed, Jan 8, 2020 at 3:03 PM Kenneth Knowles  wrote:

> Hmm. I've seen this manifest in some other tweaked versions of Sessions.
> Your invariants are right. In fact, the Nexmark queries have auctions that
> truncate in a similar way. This prompted
> https://issues.apache.org/jira/browse/BEAM-654.  I think we have not
> really nailed down the right spec for merging, and we certainly aren't
> enforcing it. To be robust, your merging should be associative and
> commutative, which means that you can't have an "end of session" event that
> contradicts a merge that occurred. OTOH I also know that Tyler has hacked
> window functions that split... it is mostly unexplored, semantically.
>
> About the error, this may help debug: The "state address windows" for a
> given merged window are all the windows that contribute to it. This means
> that when windows A and B merge to become a window AB, we can leave the
> accumulated state stored with A and B and just note that when we read from
> AB we actually have to read from both A and B*. So suppose windows A and B
> are about to merge. Before merge, the state address window map is:
>
> A -> [A]
> B -> [B]
>
> After merge, there a new window AB and "window to state address window"
> mapping
>
> AB -> [A, B]
>
> The error means that there is more than one merged window that will read
> data from a pre-merged window. So there is a situation like
>
> AB -> [A, B]
> BC -> [B, C]
>
> This is not intended to happen. It would be the consequence of B merging
> into two different new windows. Hence it is an internal error. Most likely
> a bug or a mismatch based on the assumptions. Note that this code/logic is
> shared by all runners. I do think you can write a WindowFn that induces it.
>
> Kenn
>
> *this was intended to be a performance optimization, but eagerly copying
> the data turned out faster so now it is a legacy compatibility thing that
> we could remove it I think, but changing this code is tricky
>
> On Tue, Jan 7, 2020 at 3:27 PM Aaron Dixon  wrote:
>
>> What I'm attempting is a variation on Session windows in which there may
>> exist a "terminal" element in the stream that immediately stops the session
>> (or perhaps after some configured delay.)
>>
>> My implementation behaves just like Sessions until any such "terminal"
>> element is encountered in which case I mark the window as "terminal" and
>> all windows "merge down" such that any terminal windows get to dictate the
>> Interval.end()/Window.maxTimestamp().
>>
>> So, trivial example, if I have windows W1 [0, 100) and W2 [50, 75,
>> terminal = true] then the merged result will be W3 [0, 75).
>>
>> I've been successful doing this so far but I've been inferring some
>> invariants about windows that I'm not sure are official or documented
>> anywhere.
>>
>> The invariants that I've inferred go like this:
>>
>> (I) Definition. An element is "in" window W if it originated in W or in a
>> window that was merged into W (, recursively.)
>>
>> (II) Invariant. Any element, e, in window W MUST have e.timestamp <=
>> W.maxTimestamp().
>>
>> So far, I think this is obvious and true stuff (I hope). (It would
>> actually be better or great if there was a way for II to not have to hold,
>> but that is a whole other separate discussion I think.)
>>
>> The main invariant I'm trying to formalize is one that allows me to
>> "merge down" -- i.e., to merge in such a way that the merged window's
>> (mergedResult's) maxTimestamp *is less than* one of the source's
>> (toBeMerged's) windows' maxTimestamp.
>>
>> The (undocumented?) invariant I've been working from goes something like
>> this:
>>
>> (III) Corollary. Windows W1 and W2 can merge such that either
>> maxTimestamp() is regressed (moved backward in time aka "merge down") in
>> the merged window -- however they cannot merge such that (II) is ever
>> violated.
>>
>> Is this correct?
>>
>> (If you can this can be confirmed, I'll go back and ensure I'm not
>> violating the merge() precondition and these invariants and post some code
>> if needed..) Thank you for assistance heere!
>>
>>
>> On Tue, Jan 7, 2020 at 4:21 PM Reuven Lax  wrote:
>>
>>> Have you used Dataflow's update feature on this pipeline? Also, do
>>> you have the code for your WindowFn?
>>>
>>> On Tue, Jan 7, 2020 at 12:05 PM Aaron Dixon  wrote:
>>>
>>>> Dataflow. (See

Re: Custom window invariants and

2020-01-07 Thread Aaron Dixon
What I'm attempting is a variation on Session windows in which there may
exist a "terminal" element in the stream that immediately stops the session
(or perhaps after some configured delay.)

My implementation behaves just like Sessions until any such "terminal"
element is encountered in which case I mark the window as "terminal" and
all windows "merge down" such that any terminal windows get to dictate the
Interval.end()/Window.maxTimestamp().

So, trivial example, if I have windows W1 [0, 100) and W2 [50, 75, terminal
= true] then the merged result will be W3 [0, 75).

I've been successful doing this so far but I've been inferring some
invariants about windows that I'm not sure are official or documented
anywhere.

The invariants that I've inferred go like this:

(I) Definition. An element is "in" window W if it originated in W or in a
window that was merged into W (, recursively.)

(II) Invariant. Any element, e, in window W MUST have e.timestamp <=
W.maxTimestamp().

So far, I think this is obvious and true stuff (I hope). (It would actually
be better or great if there was a way for II to not have to hold, but that
is a whole other separate discussion I think.)

The main invariant I'm trying to formalize is one that allows me to "merge
down" -- i.e., to merge in such a way that the merged window's
(mergedResult's) maxTimestamp *is less than* one of the source's
(toBeMerged's) windows' maxTimestamp.

The (undocumented?) invariant I've been working from goes something like
this:

(III) Corollary. Windows W1 and W2 can merge such that either
maxTimestamp() is regressed (moved backward in time aka "merge down") in
the merged window -- however they cannot merge such that (II) is ever
violated.

Is this correct?

(If you can this can be confirmed, I'll go back and ensure I'm not
violating the merge() precondition and these invariants and post some code
if needed..) Thank you for assistance heere!


On Tue, Jan 7, 2020 at 4:21 PM Reuven Lax  wrote:

> Have you used Dataflow's update feature on this pipeline? Also, do
> you have the code for your WindowFn?
>
> On Tue, Jan 7, 2020 at 12:05 PM Aaron Dixon  wrote:
>
>> Dataflow. (See stacktrace)
>>
>> On Tue, Jan 7, 2020 at 1:50 PM Reuven Lax  wrote:
>>
>>> Which runner are you using?
>>>
>>> On Tue, Jan 7, 2020, 11:17 AM Aaron Dixon  wrote:
>>>
>>>> I get an IllegalStateException " is in more than one state
>>>> address window set" (stacktrace below).
>>>>
>>>> What does this mean? What invariant of custom window implementation
>>>> & merging am I violating?
>>>>
>>>> Thank you for any advise.
>>>>
>>>> ```
>>>> java.lang.IllegalStateException:
>>>> {[2019-12-05T01:36:48.870Z..2019-12-05T01:36:48.871Z),terminal} is in more
>>>> than one state address window set
>>>> at
>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState
>>>> (Preconditions.java:588)
>>>> at
>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.checkInvariants
>>>> (MergingActiveWindowSet.java:334)
>>>> at
>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.persist
>>>> (MergingActiveWindowSet.java:88)
>>>> at
>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.persist
>>>> (ReduceFnRunner.java:380)
>>>> at
>>>> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement
>>>> (StreamingGroupAlsoByWindowViaWindowSetFn.java:96)
>>>> ...
>>>> ```
>>>>
>>>


Re: Custom window invariants and

2020-01-07 Thread Aaron Dixon
Dataflow. (See stacktrace)

On Tue, Jan 7, 2020 at 1:50 PM Reuven Lax  wrote:

> Which runner are you using?
>
> On Tue, Jan 7, 2020, 11:17 AM Aaron Dixon  wrote:
>
>> I get an IllegalStateException " is in more than one state
>> address window set" (stacktrace below).
>>
>> What does this mean? What invariant of custom window implementation
>> & merging am I violating?
>>
>> Thank you for any advise.
>>
>> ```
>> java.lang.IllegalStateException:
>> {[2019-12-05T01:36:48.870Z..2019-12-05T01:36:48.871Z),terminal} is in more
>> than one state address window set
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState
>> (Preconditions.java:588)
>> at
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.checkInvariants
>> (MergingActiveWindowSet.java:334)
>> at
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.persist
>> (MergingActiveWindowSet.java:88)
>> at
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.persist
>> (ReduceFnRunner.java:380)
>> at
>> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement
>> (StreamingGroupAlsoByWindowViaWindowSetFn.java:96)
>> ...
>> ```
>>
>


Custom window invariants and

2020-01-07 Thread Aaron Dixon
I get an IllegalStateException " is in more than one state address
window set" (stacktrace below).

What does this mean? What invariant of custom window implementation
& merging am I violating?

Thank you for any advise.

```
java.lang.IllegalStateException:
{[2019-12-05T01:36:48.870Z..2019-12-05T01:36:48.871Z),terminal} is in more
than one state address window set
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState
(Preconditions.java:588)
at
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.checkInvariants
(MergingActiveWindowSet.java:334)
at
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.persist
(MergingActiveWindowSet.java:88)
at
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.persist
(ReduceFnRunner.java:380)
at
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement
(StreamingGroupAlsoByWindowViaWindowSetFn.java:96)
...
```


Re: session window puzzle

2019-12-08 Thread Aaron Dixon
Following up w/ defeat. My "simple" solution (perhaps no surprise) did not
work. I suppose I was hoping that `maxTimestamp` was strictly used to
determine firing of windows wrt to *input* watermark. And so I was hoping
to get away with having windows containing elements w/ timestamps larger
than this max.

But I get errors,

```
TimestampCombiner moved element from 2019-01-01T01:04:00.000Z to earlier
time 2019-01-01T01:01:00.000Z for window
[2019-01-01T01:00:00.000Z..2019-01-01T01:01:00.001Z)
```

If `maxTimestamp` not only governs window closing/firing but also restricts
element timetamps in the window, I don't see a path forward using custom
windows -- ie, how could a window ever fire with a combined (GREEN)
timestamp since this timestamp would necessarily have to precede other
elements in the window (eg BLUE)?

Given these setbacks my latest inclination is to return to using
`outputWithTimestamp` BUT using TimestampCombiner/LATEST *plus*
 `withAllowedTimestampSkew`.

This seems to offer a benefit -- so long as I'm willing to configure my
solution in terms of lateness and late firings. But
`withAllowedTimestampSkew` is deprecated. Why?

As I think about it, it seems quite useful and suited to this kind of
problem. The alternative of using TimestampCombiner/EARLIEST has the
deal-breaking property of dragging the watermark in unpredictable and
uncontrollable ways.

(I say this b/c afaiu there isn't much in the way of explicit control of
the watemark ... a la "never let the watermark drag in **this stage** by
**X**" or something like that..., whereas

..`withAllowedTimestampSkew`, while forcing the solution into dealing
with lateness configurations at least (1) keeps the watermark predictably
moving, and (2) allows for the configuration/use of `withAllowedLateness`
as a lever to govern what I'm okay with retaining & dropping etc.

The reasons given in the @deprecation of `withAllowedTimestampSkew` seem to
claim that dealing in lateness is somehow bad -- but for my scenario, I'm
beginning to think it is suited?

(Even my earlier suggestion about more configurable tuning of
TimestampCombiner likely would have the unpredictable effects on watermark
progression. But `withAllowedLateness` gives a concrete predictability, and
short of any other special per-stage customized regulation of the
watermark, what other options do I have?)




On Sat, Dec 7, 2019 at 10:54 PM Aaron Dixon  wrote:

> > In your proposed solution, it probably could be expressed as a new
> merging WindowFn. You would assign each Green element to two tagged windows
> that were GreenFromOrange and GreenToBlue type, and have a separate window
> tag for OrangeWindow and BlueWindow. Then GreenFromOrange merges with
> OrangeWindow only, etc.
>
> I'm finally returning to considering this implementation and getting into
> the brass tacks of it I'm realizing I didn't 100% understand your proposal^^
>
> With a merging windows solution I am of the assumption that whatever is
> the final merge, it will produce one and only one window (so that my
> aggregation is the single produced value I need.)
>
> As a reminder my end goal is to emit the aggregation with an event
> timestamp equal to the GREEN event.
>
> I am very curious your proposal, if you could flesh it out more (how the
> merges would carry out til the final single window.) However, wouldn't the
> following very simple custom window strategy do the trick:
>
> 1) Assign windows just like Beam's out-of-the-box Sessions (using a
> configured gap duration) but recording the GREEN timestamp w/in the custom
> window object when we see it.
> 2) When the terminal BLUE event is seen, merge/collapse all windows such
> that the final window's right-bound (ie, `maxTimestamp()`) falls back to be
> equal to the GREEN value. (This implies of course that the window's right
> bound could fall back substantially.) The default
> TimestampCombiner/END_OF_WINDOW would then ensure that the emitted output
> timestamp of the agg was this GREEN timestamp.
>
> The input watermark would likely be far ahead of GREEN so of course this
> final merged window would presumably fire immediately.
>
> The output watermark (I think) is held to the minimum of elements in live
> windows so we should not be regressing the watermark by falling our
> window's right-bound far back to GREEN in this way.
>
> Any notes on this would be appreciated. Especially confirmation about how
> I'm thinking about the watermark would help me get a handle on implementing
> correct custom windows.
>
> Thanks!
>
>
>
>
>
>
>
>
>
> On Fri, Nov 15, 2019 at 3:51 PM Kenneth Knowles  wrote:
>
>> On Wed, Nov 13, 2019 at 7:39 PM Aaron Dixon  wrote:
>>
>>> This is a great help. Thank you. I like the custom window solution
>>> pattern as a

Re: session window puzzle

2019-12-07 Thread Aaron Dixon
> In your proposed solution, it probably could be expressed as a new
merging WindowFn. You would assign each Green element to two tagged windows
that were GreenFromOrange and GreenToBlue type, and have a separate window
tag for OrangeWindow and BlueWindow. Then GreenFromOrange merges with
OrangeWindow only, etc.

I'm finally returning to considering this implementation and getting into
the brass tacks of it I'm realizing I didn't 100% understand your proposal^^

With a merging windows solution I am of the assumption that whatever is the
final merge, it will produce one and only one window (so that my
aggregation is the single produced value I need.)

As a reminder my end goal is to emit the aggregation with an event
timestamp equal to the GREEN event.

I am very curious your proposal, if you could flesh it out more (how the
merges would carry out til the final single window.) However, wouldn't the
following very simple custom window strategy do the trick:

1) Assign windows just like Beam's out-of-the-box Sessions (using a
configured gap duration) but recording the GREEN timestamp w/in the custom
window object when we see it.
2) When the terminal BLUE event is seen, merge/collapse all windows such
that the final window's right-bound (ie, `maxTimestamp()`) falls back to be
equal to the GREEN value. (This implies of course that the window's right
bound could fall back substantially.) The default
TimestampCombiner/END_OF_WINDOW would then ensure that the emitted output
timestamp of the agg was this GREEN timestamp.

The input watermark would likely be far ahead of GREEN so of course this
final merged window would presumably fire immediately.

The output watermark (I think) is held to the minimum of elements in live
windows so we should not be regressing the watermark by falling our
window's right-bound far back to GREEN in this way.

Any notes on this would be appreciated. Especially confirmation about how
I'm thinking about the watermark would help me get a handle on implementing
correct custom windows.

Thanks!









On Fri, Nov 15, 2019 at 3:51 PM Kenneth Knowles  wrote:

> On Wed, Nov 13, 2019 at 7:39 PM Aaron Dixon  wrote:
>
>> This is a great help. Thank you. I like the custom window solution
>> pattern as a way to hold the watermark and merge down to keep the watermark
>> where it is needed. Perhaps there is some interesting generalized session
>> window here.. I'll have to digest the stateful DoFn approach. Avoiding
>> unnecessary shuffles is a good note.
>>
>> As a side note, there is MIN, MAX and END_OF_WINDOW TimestampCombiner.
>> Has it been discussed to ever allow more customization here? Seems like
>> customizing the combiner with element-awareness would have solved this
>> problem, as well.
>>
>
> Good question :-). In fact, the original design was OutputTimeFn that was
> a custom user-defined function.
>
>  - Discussed a bit here:
> https://docs.google.com/document/d/12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y/edit#heading=h.45gyyckqhg4c
>  - Removed here: https://github.com/apache/beam/pull/2683
>
> Users found the name and choices confusing. Also, the runner needs to grok
> the behavior of it in order to implement many optimizations. Before
> portability there were a bunch of instanceof checks and anything not on the
> happy path was slower. With portability, it needs to be represented in
> protobuf. Reinstating this would mean adding a new CUSTOM enum to
> TimestampCombiner and it would have performance costs.
>
> Kenn
>
>
>>
>>
>> On Wed, Nov 13, 2019 at 7:56 PM Kenneth Knowles  wrote:
>>
>>> You've done a very good analysis* and I think your solution is pretty
>>> clever. The simple fact is this: the watermark has to be held to the
>>> minimum of any output you intend to produce. So for your use case, the hold
>>> has to be the timestamp of the Green element. Your solution does hold the
>>> watermark to the right time. I have a couple thoughts that may be helpful.
>>>
>>> 0. If you partition by user does the stream contain a bunch of Orange,
>>> Green, Blue elements? Is it possible that a session contains multiple
>>> [Orange, Green, Blue] sequences? Is it possible that an [Orange, Green,
>>> Blue] sequence is split across multiple sessions?
>>>
>>> 1. In your proposed solution, it probably could be expressed as a new
>>> merging WindowFn. You would assign each Green element to two tagged windows
>>> that were GreenFromOrange and GreenToBlue type, and have a separate window
>>> tag for OrangeWindow and BlueWindow. Then GreenFromOrange merges with
>>> OrangeWindow only, etc.
>>>
>>> 2. This might also turn out simply as a stateful DoFn, where you
>>> manually ma

Re: real real-time beam

2019-12-06 Thread Aaron Dixon
t;> of granularity). This was mentioned in the initial discussion about event
>>> time ordering, and is part of the design doc - users should be allowed to
>>> provide UDF for extracting time-correlated ordering field (which means
>>> ability to choose a preferred, or authoritative, observer which assigns
>>> unambiguous ordering to events). Example of this might include Kafka
>>> offsets as well, or any queue index for that matter. This is not yet
>>> implemented, but could (should) be in the future.
>>>
>>> The only case where these two things are (somewhat) different is the
>>> case mentioned by @Steve - if the output is stateless ParDo, which will get
>>> fused. But that is only because the processing is single-threaded per key,
>>> and therefore the ordering is implied by timer ordering (and careful here,
>>> many runners don't have this ordering 100% correct, as of now - this
>>> problem luckily appears only when there are multiple timers per key).
>>> Moreover, if there should be a failure, then the output might (would) get
>>> back in time anyway. If there would be a shuffle operation after
>>> GBK/Combine, then the ordering is no longer guaranteed and must be
>>> explicitly taken care of.
>>>
>>> Last note, I must agree with @Rui that all these discussions are very
>>> much related to retractions (precisely the ability to implement them).
>>>
>>> Jan
>>> On 11/26/19 7:34 AM, Kenneth Knowles wrote:
>>>
>>> Hi Aaron,
>>>
>>> Another insightful observation.
>>>
>>> Whenever an aggregation (GBK / Combine per key) has a trigger firing,
>>> there is a per-key sequence number attached. It is included in metadata
>>> known as "PaneInfo" [1]. The value of PaneInfo.getIndex() is colloquially
>>> referred to as the "pane index". You can also make use of the "on time
>>> index" if you like. The best way to access this metadata is to add a
>>> parameter of type PaneInfo to your DoFn's @ProcessElement method. This
>>> works for stateful or stateless DoFn.
>>>
>>> Most of Beam's IO connectors do not explicitly enforce that outputs
>>> occur in pane index order but instead rely on the hope that the runner
>>> delivers panes in order to the sink. IMO this is dangerous but it has not
>>> yet caused a known issue. In practice, each "input key to output key 'path'
>>> " through a pipeline's logic does preserve order for all existing runners
>>> AFAIK and it is the formalization that is missing. It is related to an
>>> observation by +Rui Wang  that processing
>>> retractions requires the same key-to-key ordering.
>>>
>>> I will not try to formalize this notion in this email. But I will note
>>> that since it is universally assured, it would be zero cost and
>>> significantly safer to formalize it and add an annotation noting it was
>>> required. It has nothing to do with event time ordering, only trigger
>>> firing ordering.
>>>
>>> Kenn
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
>>> [2]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L557
>>>
>>>
>>> On Mon, Nov 25, 2019 at 4:06 PM Pablo Estrada 
>>> wrote:
>>>
>>>> The blog posts on stateful and timely computation with Beam should help
>>>> clarify a lot about how to use state and timers to do this:
>>>> https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>>>> https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>>>
>>>> You'll see there how there's an implicit per-single-element grouping
>>>> for each key, so state and timers should support your use case very well.
>>>>
>>>> Best
>>>> -P.
>>>>
>>>> On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz 
>>>> wrote:
>>>>
>>>>> If you have a pipeline that looks like Input -> GroupByKey -> ParDo,
>>>>> while it is not guaranteed, in practice the sink will observe the trigger
>>>>> firings in order (per key), since it'll be fused to the output of the GBK
>>>>> operation (in all runners I know of).
>>>>>
>>>>> There have been a couple threads about trigger ordering as well on t

Re: real real-time beam

2019-11-25 Thread Aaron Dixon
@Jan @Pablo Thank you

@Pablo In this case it's a single global windowed Combine/perKey, triggered
per element. Keys are few (client accounts) so they can live forever.

It looks like just by virtue of using a stateful ParDo I could get this
final execution to be "serialized" per key. (Then I could simply do the
compare-and-swap using Beam's state mechanism to keep track of the "latest
trigger timestamp" instead of having to orchestrate compare-and-swap in the
target store :thinking:.)



On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský  wrote:

> One addition, to make the list of options exhaustive, there is probably
> one more option
>
>   c) create a ParDo keyed by primary key of your sink, cache the last
> write in there and compare it locally, without the need to query the
> database
>
> It would still need some timer to clear values after watermark + allowed
> lateness, because otherwise you would have to cache your whole database
> on workers. But because you don't need actual ordering, you just need
> the most recent value (if I got it right) this might be an option.
>
> Jan
>
> On 11/25/19 10:53 PM, Jan Lukavský wrote:
> > Hi Aaron,
> >
> > maybe someone else will give another option, but if I understand
> > correctly what you want to solve, then you essentially have to do either:
> >
> >  a) use the compare & swap mechanism in the sink you described
> >
> >  b) use a buffer to buffer elements inside the outputting ParDo and
> > only output them when watermark passes (using a timer).
> >
> > There is actually an ongoing discussion about how to make option b)
> > user-friendly and part of Beam itself, but currently there is no
> > out-of-the-box solution for that.
> >
> > Jan
> >
> > On 11/25/19 10:27 PM, Aaron Dixon wrote:
> >> Suppose I trigger a Combine per-element (in a high-volume stream) and
> >> use a ParDo as a sink.
> >>
> >> I assume there is no guarantee about the order that my ParDo will see
> >> these triggers, especially as it processes in parallel, anyway.
> >>
> >> That said, my sink writes to a db or cache and I would not like the
> >> cache to ever regress its value to something "before" what it has
> >> already written.
> >>
> >> Is the best way to solve this problem to always write the event-time
> >> in the cache and do a compare-and-swap only updating the sink if the
> >> triggered value in-hand is later than the target value?
> >>
> >> Or is there a better way to guarantee that my ParDo sink will process
> >> elements in-order? (Eg, if I can give up per-event/real-time, then a
> >> delay-based trigger would probably be sufficient I imagine.)
> >>
> >> Thanks for advice!
>


real real-time beam

2019-11-25 Thread Aaron Dixon
Suppose I trigger a Combine per-element (in a high-volume stream) and use a
ParDo as a sink.

I assume there is no guarantee about the order that my ParDo will see these
triggers, especially as it processes in parallel, anyway.

That said, my sink writes to a db or cache and I would not like the cache
to ever regress its value to something "before" what it has already written.

Is the best way to solve this problem to always write the event-time in the
cache and do a compare-and-swap only updating the sink if the triggered
value in-hand is later than the target value?

Or is there a better way to guarantee that my ParDo sink will process
elements in-order? (Eg, if I can give up per-event/real-time, then a
delay-based trigger would probably be sufficient I imagine.)

Thanks for advice!


Re: session window puzzle

2019-11-13 Thread Aaron Dixon
This is a great help. Thank you. I like the custom window solution
pattern as a way to hold the watermark and merge down to keep the watermark
where it is needed. Perhaps there is some interesting generalized session
window here.. I'll have to digest the stateful DoFn approach. Avoiding
unnecessary shuffles is a good note.

As a side note, there is MIN, MAX and END_OF_WINDOW TimestampCombiner. Has
it been discussed to ever allow more customization here? Seems like
customizing the combiner with element-awareness would have solved this
problem, as well.


On Wed, Nov 13, 2019 at 7:56 PM Kenneth Knowles  wrote:

> You've done a very good analysis* and I think your solution is pretty
> clever. The simple fact is this: the watermark has to be held to the
> minimum of any output you intend to produce. So for your use case, the hold
> has to be the timestamp of the Green element. Your solution does hold the
> watermark to the right time. I have a couple thoughts that may be helpful.
>
> 0. If you partition by user does the stream contain a bunch of Orange,
> Green, Blue elements? Is it possible that a session contains multiple
> [Orange, Green, Blue] sequences? Is it possible that an [Orange, Green,
> Blue] sequence is split across multiple sessions?
>
> 1. In your proposed solution, it probably could be expressed as a new
> merging WindowFn. You would assign each Green element to two tagged windows
> that were GreenFromOrange and GreenToBlue type, and have a separate window
> tag for OrangeWindow and BlueWindow. Then GreenFromOrange merges with
> OrangeWindow only, etc.
>
> 2. This might also turn out simply as a stateful DoFn, where you manually
> manage what state the funnel is in. When you set a timer to wait for the
> Orange element, you may need an upcoming feature where you set a timer for
> a future event time but the watermark is held to the Green element's
> timestamp. CC Reuven on that use case.
>
> What I would like to avoid is you having to do two shuffles (on whatever
> runner). This should be doable with one.
>
> *SessionWindow plus EARLIEST holding up the watermark/pipeline was an
> early complaint. That is part of why we switched the default to
> end-of-window (also it is easier to understand and more efficient to
> compute)
>
> Kenn
>
> On Wed, Nov 13, 2019 at 3:25 PM Aaron Dixon  wrote:
>
>> This is a real use case we have, but simplified:
>>
>> My user session look like this: user visits a page, and clicks three
>> buttons: Orange then Green then Blue.
>>
>> I need to compute the average time between Orange & Blue clicks but I
>> need to window on the timestamp of the green button click.
>>
>> In requirements terms: Compute average time between Orange and Blue for
>> all Green clicks that occur on Monday. (So User could click Orange on
>> Sunday, Green on Monday and Blue on Tuesday.)
>>
>> One strategy is to try to use a single SessionWindow to capture the
>> entire user session; then calculate the *span* (time between Orange and
>> Blue clicks) and *then* compute average of all spans.
>>
>> To do this the *span*/counts would have to all "land" in a window
>> representing Monday.
>>
>> If I use a SessionWindow w/ TimestampCombiner/EARLIEST then I can make
>> sure they land in this window using .outputWithTimestamp without worrying
>> that I'll be regressing the event timestamp.
>>
>> Except when I use this Combiner/EARLIEST strategy my watermark is held up
>> substantially (and incidentally seems to drag the pipeline).
>>
>> But if I use Beam's default TimestampCombiner/END_OF_WINDOW then I won't
>> be able to output the *span* result at a timestamp representing the
>> Green click.
>>
>> So a single SessionWindow seems out. (Unless I'm missing something.)
>>
>> The only other strategy I can conceive of at the moment is to capture
>> *two* sessions, representing each "leg" of the overall session. One
>> windows on the [Orange,Green] (using END_OF_WINDOW); the other [Green,Blue]
>> (using EARLIEST). Then I can "join" these two to get both legs together and
>> compute the overall span. This seems like a quite complicated way to solve
>> this (simple?) problem.
>>
>> Thoughts? What am I missing?
>>
>


Re: Custom Windowing and TimestampCombiner

2019-11-05 Thread Aaron Dixon
Thanks Reuven,

So is my conclusion correct? That it is illegal for any custom window
function (+ combiner policy) to merge in a way that would regress the
watermark?

What do Runners (eg Dataflow) do if this occurs?

Does the API obligate runners to fail, or can insanity ensue? :)

On Tue, Nov 5, 2019 at 10:55 AM Reuven Lax  wrote:

>
>
> On Tue, Nov 5, 2019 at 8:07 AM Aaron Dixon  wrote:
>
>> I noticed that if I use TimestampCombiner/EARLIEST for session windows
>> that the watermark appears to get held up for sessions that never "close"
>> (or that extend for a long time).
>>
>
> Correct - because the watermark is then being held up by the earliest
> timestamp in any extant session window.
>
>
>> But if I use default (TimestampCombiner/END_OF_WINDOW) the watermark
>> doesn't get held.
>>
>
> Yes - because then the watermark gets held up by the current end of window.
>
>
>>
>> Does this mean that the watermark is adjusted whenever windows are
>> merged, even before they "close"?
>>
>
> In the second case, yes. Every time windows merge, the end of window for
> that key is recalculated. The actual watermark will be the minimum of all
> these end-of-windows (as each window is per key)
>
>
>> If that is the case, and I write a custom WindowFn, is this implication
>> of this that I should never move the `end`/`maxTimestamp` of the new/merged
>> window *backwards* in time?
>>
>>
>>
>>


Custom Windowing and TimestampCombiner

2019-11-05 Thread Aaron Dixon
I noticed that if I use TimestampCombiner/EARLIEST for session windows that
the watermark appears to get held up for sessions that never "close" (or
that extend for a long time).

But if I use default (TimestampCombiner/END_OF_WINDOW) the watermark
doesn't get held.

Does this mean that the watermark is adjusted whenever windows are merged,
even before they "close"?

If that is the case, and I write a custom WindowFn, is this implication of
this that I should never move the `end`/`maxTimestamp` of the new/merged
window *backwards* in time?


Re: aggregating over triggered results

2019-10-31 Thread Aaron Dixon
First of all thank you for taking the time on this very clear and helpful
message. Much appreciated.

>I suppose one could avoid doing any pre-aggregation, and emit all of
the events (with reified timestamp) in 60/30-day windows, then have a
DoFn that filters on the events and computes each of the 10-minute
aggregates over the "true" sliding window (4320 outputs). This could
be cheaper if your events are very sparse, will be more expensive if
they're very dense, and it's unclear what the tradeoff will be.

This is exactly what I was doing (trying to do), reify the events and
filter them out to compute my own desired window for the trigger. I have
lots of events but each key has few events (in the thousands) but I think
your point is that even this is not a win, the events overall would have to
be quite sparse for it to be a win and by how much. So I can see why this
is perhaps not a great thread to pursue.

On another note, trying to use *periodic* triggers like this in
*intermediate* pipeline stages and leverage them in downstream aggregations
was something I was trying to do here and in a few other cases. (I'm new to
Beam and triggers seemed fundamental so I expected to not get so lost
trying to use them this way.) But at least at this stage of my
understanding I think this was misplaced... periodic triggers seem
primarily important say at the last stage of a pipeline where you may be
writing updates to an actual sink/table.

In other words suppose the above (60/30 day sliding) approach turned out to
be more efficient... I still have no idea if, using Beam, I'd be able to
properly regroup on the other side and pick out all the "latest triggered"
events from the rest... or even know when I've got them. This was the
source of my original question, but I'm now just thinking this is just not
what people do in Beam pipelines... periodically trigger windows _in the
middle_ of a pipeline. Am I on the right track in this thinking? If so, I
wonder if the API would better reflect this? If it's a doomed strategy to
try to periodically trigger 'into' downstream aggregations, why is the API
so friendly to doing just this?







On Wed, Oct 30, 2019 at 5:37 PM Robert Bradshaw  wrote:

> On Tue, Oct 29, 2019 at 7:01 PM Aaron Dixon  wrote:
> >
> > Thank you, Luke and Robert. Sorry for hitting dev@, I criss-crossed and
> meant to hit user@, but as we're here could you clarify your two points,
> however--
>
> No problem. This is veering into dev@ territory anyway :).
>
> > 1) I am under the impression that the 4,000 sliding windows approach (30
> days every 10m) will re-evaluate my combine aggregation every 10m whereas
> with the two-window approach my Combine aggregation would evolve
> iteratively, only merging new results into the aggregation.
> >
> > If there's a cross-window optimization occurring that would allow
> iterative combining _across windows_, given the substantial order of
> magnitude difference in scale at play, is it safe to consider such
> 'internal optimization detail' part of the platform contract (Dataflow's,
> say)? Otherwise it would be hard to lean on this from a production system
> that will live into the future.
>
> OK, let's first define exactly what (I think) you're trying to
> compute. Let's label windows by their (upper) endpoint. So, every 10
> minutes you have a window W_t and an aggregate Aggr(e for e in
> all_events if t - 60days <= timestamp(e) < t).
>
> The way this is computed in Beam is by storing a map W_t ->
> RunningAggregate and whenever we see an element with timestamp T we
> assign it to the set of windows S = {W_t : T in W_t} (in this case
> there would be 30*24*6 = 4320 of them) and subsequently update all the
> running aggregates. When we are sure we've seen all elements up to t
> (the watermark) we release window W_t with its computed aggregate
> downstream.
>
> An alternative that's often proposed, and works only for aligned
> sliding windows, is to instead store a map of 10-minute buckets to
> running aggregates, and whenever an element comes in we add its value
> to the aggregate of that bucket. This side is cheaper, but every time
> the watermark tells us we're able to release a window we then have to
> compute an Aggregate over all 4000 of these buckets.
>
> A further extension, if the aggregation function is reversible, is to
> keep a running total, and every time we release a window, we add the
> next bucket's contribution, and remove the previous buckets
> contribution, to this running total. If the computation is not
> reversible, we can compute a "binary tree" of aggregates (e.g. 10-min
> buckets, 20-min buckets, 40-min buckets, ...) and perform log(N)
> aggregations each time and element comes in and log(N) every time a
> window is released.
>
> Each of the

Re: aggregating over triggered results

2019-10-29 Thread Aaron Dixon
Thank you, Luke and Robert. Sorry for hitting dev@, I criss-crossed and meant 
to hit user@, but as we're here could you clarify your two points, however--

1) I am under the impression that the 4,000 sliding windows approach (30 days 
every 10m) will re-evaluate my combine aggregation every 10m whereas with the 
two-window approach my Combine aggregation would evolve iteratively, only 
merging new results into the aggregation. 

If there's a cross-window optimization occurring that would allow iterative 
combining _across windows_, given the substantial order of magnitude difference 
in scale at play, is it safe to consider such 'internal optimization detail' 
part of the platform contract (Dataflow's, say)? Otherwise it would be hard to 
lean on this from a production system that will live into the future.

2) When you say "regardless of the how the problem is structured" there are 
4,000 stored 'sub-aggregations', even in the two-window approach--why is that 
so? Isn't the volume of panes produced by a trigger a function of what keys 
have actually received new values *in the window*?

Thanks for help in understanding these details. I want to make good use of Beam 
and hope to contribute back at some point (docs/writing etc), once I can come 
to terms with all of these pieces.

On 2019/10/29 20:39:18, Robert Bradshaw  wrote: 
> No matter how the problem is structured, computing 30 day aggregations
> for every 10 minute window requires storing at least 30day/10min =
> ~4000 sub-aggregations. In Beam, the elements themselves are not
> stored in every window, only the intermediate aggregates.
> 
> I second Luke's suggestion to try it out and see if this is indeed a
> prohibitive bottleneck.
> 
> On Tue, Oct 29, 2019 at 1:29 PM Luke Cwik  wrote:
> >
> > You should first try the obvious answer of using a sliding window of 30 
> > days every 10 minutes before you try the 60 days every 30 days.
> > Beam has some optimizations which will assign a value to multiple windows 
> > and only process that value once even if its in many windows. If that 
> > doesn't perform well, then come back to dev@ and look to optimize.
> >
> > On Tue, Oct 29, 2019 at 1:22 PM Aaron Dixon  wrote:
> >>
> >> Hi I am new to Beam.
> >>
> >> I would like to accumulate data over 30 day period and perform a running 
> >> aggregation over this data, say every 10 minutes.
> >>
> >> I could use a sliding window of 30 days every 10 minutes (triggering at 
> >> end of window) but this seems grossly inefficient (both in terms of # of 
> >> windows at play and # of events duplicated across these windows).
> >>
> >> A more efficient strategy seems to be to use a sliding window of 60 days 
> >> every 30 days -- triggering every 10 minutes -- so that I'm guaranteed to 
> >> have 30 days worth of data aggregated/combined in at least one of the 2 
> >> at-play sliding windows.
> >>
> >> The last piece of this puzzle however would be to do a final global 
> >> aggregation over only the keys from the latest trigger of the earlier 
> >> sliding window.
> >>
> >> But Beam does not seem to offer a way to orchestrate this. Even though 
> >> this seems like it would be a pretty common or fundamental ask.
> >>
> >> One thought I had was to re-window in a way that would isolate keys 
> >> triggered at the same time, in the same window but I don't see any 
> >> contracts from Beam that would allow an approach like that.
> >>
> >> What am I missing?
> >>
> >>
> 


aggregating over triggered results

2019-10-29 Thread Aaron Dixon
Hi I am new to Beam.

I would like to accumulate data over 30 day period and perform a running
aggregation over this data, say every 10 minutes.

I could use a sliding window of 30 days every 10 minutes (triggering at end
of window) but this seems grossly inefficient (both in terms of # of
windows at play and # of events duplicated across these windows).

A more efficient strategy seems to be to use a sliding window of 60 days
every 30 days -- *triggering* every 10 minutes -- so that I'm guaranteed to
have 30 days worth of data aggregated/combined in at least one of the 2
at-play sliding windows.

The last piece of this puzzle however would be to do a final global
aggregation over *only the keys from the latest trigger of the earlier
sliding window*.

But Beam does not seem to offer a way to orchestrate this. Even though this
seems like it would be a pretty common or fundamental ask.

One thought I had was to re-window in a way that would isolate keys
triggered at the same time, in the same window but I don't see any
contracts from Beam that would allow an approach like that.

What am I missing?