Re: Looping timer blog

2019-06-26 Thread Reza Rokni
On Tue, 25 Jun 2019 at 21:20, Jan Lukavský  wrote:

>
> On 6/25/19 1:43 PM, Reza Rokni wrote:
>
>
>
> On Tue, 25 Jun 2019 at 18:12, Jan Lukavský  wrote:
>
>> > The TTL check would be in the same Timer rather than a separate Timer.
>> The max value processed in each OnTimer call would be stored in valuestate
>> and used as base to know how long it has been seen the pipeline has seen an
>> external value for that key.
>>
>> OK, that seems to work, if you store maximum timestamp in a value state
>> (that is, basically you generate per-key watermark).
>>
>> > You can store it in ValueState rather than BagState, but yes you store
>> that value in State ready for the next OnTimer() fire.
>>
>> In my understanding of the problem, I'd say that this approach seems a
>> little suboptimal. Consider the following, when trying to generate the OHLC
>> data (open, high, low, close, that is move last closing price to next
>> window opening price)
>>
>>  - suppose we have three times T1 < T2 < T3 < T4, where times T2 and T4
>> denote "end of windows" (closing times)
>>
>>  - first (in processing time), we receive value for time T3, we cache it
>> in ValueState, we set timer for T4
>>
>>  - next, we receive value for T1 - but we cannot overwrite the value
>> already written for T3, right? What to do then? And will we reset timer to
>> time T3?
>>
>>  - basically, because we received *two* values, both of which are needed
>> and no timer could have been fired in between, we need both values stored
>> to know which value to emit when timer fires. And consider that on batch,
>> the timer fires only after we see all data (without any ordering).
>>
> I assume you are referring to late data rather than out of order data (
> the later being taken care of with the in-memory sort). As discussed in the
> talk late data is a sharp edge, one solution for late data is to branch it
> away before GlobalWindow + State DoFn. This can then be output from the
> pipeline into a sink with a marker to initiate a manual procedure for
> correction. Essentially a manual redaction.
>
> Which in-memory sort do you refer to? I'm pretty sure there must be
> sorting involved for this to work, but I'm not quite sure where exactly it
> is in your implementation. You said that you can put data in ValueState
> rather than BagState, so do you have a List as a value in the ValueState?
> Or do you wrap the stateful par do with some other sorting logic? And if
> so, how does this work on batch? I suppose that it has to all fit to
> memory. I think this all goes around the @RequiresTimeSortedInput
> annotation, that I propose. Maybe we can cooperate on that? :)\
>
Hu... nice this chat made me notice a bug in the looping timer example
code we missed thanx :-) , the ValueState timerRunning, should
actually be a ValueState minTimestamp and the check to set the timer
needs to be if (NULL or  element.Timestamp is < then timer.Timestamp).
Which also requires the use of timer read pattern as we don't have
timer.read()
https://stackoverflow.com/questions/55912522/setting-a-timer-to-the-minimum-timestamp-seen/55912542#55912542.
I will fix and put a PR to correct the blog.

For the hold and propagate pattern (for those following the original thread
the pattern is not covered in the blog example code, but discussed at the
summit):
OnProcess()
- You can drop the accumulators into BagState.
- A timer is set at the minimum time interval.
OnTimer()
- The list is sorted in memory, for a lot of timeseries use cases (for
example ohlc) the memory issues are heavily mitigated as we can use a Fixed
Windows partial aggregations before the GlobalWindow stage. (Partial
because they dont have the correct Open value set until they flow into the
Global Window). Of course how big the window is dictates the compression
you would get.
- The current timer is set again to fire in the next interval window.

@RequiresTimeSortedInput looks super interesting, happy to help out.
Although its a harder problem then the targeted timeseries use cases where
FixedWindows aggregations can be used before the final step.

>
> Or? Am I missing something?
>>
>> Jan
>> On 6/25/19 6:00 AM, Reza Rokni wrote:
>>
>>
>>
>> On Fri, 21 Jun 2019 at 18:02, Jan Lukavský  wrote:
>>
>>> Hi Reza,
>>>
>>> great prezentation on the Beam Summit. I have had a few posts here in
>>> the list during last few weeks, some of which might actually be related to
>>> both looping timers and validity windows. But maybe you will be able to see
>>> a different approach, than I do, so questions:
>>>
>>>  a) because of [1] timers might not be exactly ordered (the JIRA talks
>>> about DirectRunner, but I suppose the issue is present on all runners that
>>> use immutable bundles of size > 1, so might be related to Dataflow as
>>> well). This might cause issues when you try to introduce TTL for looping
>>> timers, because the TTL timer might get fired before regular looping timer,
>>> which might cause incorrect results (state cleared 

Re: Return types of Write transforms (aka best way to signal)

2019-06-26 Thread Reza Rokni
The use case of a transform  waiting for a SInk or Sinks to complete is
very interesting indeed!

Curious, if a sink internally makes use of a Global Window with processing
time triggers to push its writes, what mechanism could be used to release a
transform waiting for a signal from the Sink(s) that all processing is done
and it can move forward?

On Thu, 27 Jun 2019 at 03:58, Robert Bradshaw  wrote:

> Regarding Python, yes and no. Python doesn't distinguish at compile
> time between (1), (2), and (6), but that doesn't mean it isn't part of
> the public API and people might start counting on it, so it's in some
> sense worse. We can also do (3) (which is less cumbersome in Python,
> either returning a tuple or a dict) or (4).
>
> Good point about providing a simple solution (something that can be
> waited on at least) and allowing for with* modifiers to return more.
>
> On Wed, Jun 26, 2019 at 7:08 PM Chamikara Jayalath 
> wrote:
> >
> > BTW regarding Python SDK, I think the answer to this question is simpler
> for Python SDK due to the lack of types. Most examples I know just return a
> PCollection from the Write transform which may or may not be ignored by
> users. If the PCollection is used, the user should be aware of the element
> type of the returned PCollection and should use it accordingly in
> subsequent transforms.
> >
> > Thanks,
> > Cham
> >
> > On Wed, Jun 26, 2019 at 9:57 AM Chamikara Jayalath 
> wrote:
> >>
> >>
> >>
> >> On Wed, Jun 26, 2019 at 5:46 AM Robert Bradshaw 
> wrote:
> >>>
> >>> Good question.
> >>>
> >>> I'm not sure what could be done with (5) if it contains no deferred
> >>> objects (e.g there's nothing to wait on).
> >>>
> >>> There is also (6) return PCollection. The
> >>> advantage of (2) is that one can migrate to (1) or (6) without
> >>> changing the public API, while giving something to wait on without
> >>> promising anything about its contents.
> >>>
> >>>
> >>> I would probably lean towards (4) for anything that would want to
> >>> return multiple signals/outputs (e.g. successful vs. failed writes)
> >>> and view (3) as being a "cheap" but more cumbersome for the user way
> >>> of writing (4). In both cases, more information can be added in a
> >>> forward-compatible way. Technically (4) could extend (3) if one wants
> >>> to migrate from (3) to (4) to provide a nicer API in the future. (As
> >>> an aside, it would be interesting if any of the schema work that lets
> >>> us get rid of tuple tags for elements (e.g. join operations) could let
> >>> us get rid of tuple tags for PCollectionTuples (e.g. letting a POJO
> >>> with PCollection members be as powerful as a PCollectionTuple).
> >>>
> >>> On Wed, Jun 26, 2019 at 2:23 PM Ismaël Mejía 
> wrote:
> >>> >
> >>> > Beam introduced in version 2.4.0 the Wait transform to delay
> >>> > processing of each window in a PCollection until signaled. This
> opened
> >>> > new interesting patterns for example writing to a database and when
> >>> > ‘fully’ done write to another database.
> >>> >
> >>> > To support this pattern an IO connector Write transform must return a
> >>> > type different from PDone to signal the processing of the next step.
> >>> > Some IOs have already started to implement this return type, but each
> >>> > returned type has different pros and cons so I wanted to open the
> >>> > discussion on this to see if we could somehow find a common pattern
> to
> >>> > suggest IO authors to follow (Note: It may be the case that there is
> >>> > not a pattern that fits certain use cases).
> >>> >
> >>> > So far the approaches in our code base are:
> >>> >
> >>> > 1. Write returns ‘PCollection’
> >>> >
> >>> > This is the simplest case but if subsequent transforms require more
> >>> > data that could have been produced during the write it gets ‘lost’.
> >>> > Used by JdbcIO and DynamoDBIO.
> >>> >
> >>> > 2. Write returns ‘PCollection’
> >>> >
> >>> > We can return whatever we want but the return type is uncertain for
> >>> > the user in case he wants to use information from it. This is less
> >>> > user friendly but has the maintenance advantage of not changing
> >>> > signatures if we want to change the return type in the future. Used
> by
> >>> > RabbitMQIO.
> >>> >
> >>> > 3. Write returns a `PCollectionTuple`
> >>> >
> >>> > It is like (2) but with the advantage of returning an untyped tuple
> of
> >>> > PCollections so we can return more things. Used by SnsIO.
> >>> >
> >>> > 4. Write returns ‘a class that implements POutput’
> >>> >
> >>> > This class wraps inside of the PCollections that were part of the
> >>> > write, e.g. SpannerWriteResult. This is useful because we can be
> >>> > interested on saving inside a PCollection of failed mutations apart
> of
> >>> > the ‘done’ signal. Used by BigQueryIO and SpannerIO. A generics case
> >>> > of this one is used by FileIO for Destinations via:
> >>> > ‘WriteFilesResult’.
> >>> >
> >>> > 5. Write returns ‘a class that implements POutput’ with specific data
> >>> 

Re: Accumulating mode implies that panes are processed in order?

2019-06-26 Thread Robert Bradshaw
On Thu, Jun 27, 2019 at 1:52 AM Rui Wang  wrote:
>>
>>
>>  AFAIK all streaming runners today practically do provide these panes in 
>> order;
>
> Does it refer to "the stage immediately after GBK itself processes fired 
> panes in order" in streaming runners? Could you share more information?
>
>
>>
>> this means that likely many users implicitly rely on this "non guarantee," 
>> probably without even knowing they are relying on it.
>
> If streaming runners have already provided or processed panes in order, and 
> likely many users rely on it already, why not make order of panes a part of 
> model explicitly?

Most runners produce panes in order, but they don't necessarily
preserve the order downstream (at least beyond what's fused into the
same stage, which is where it gets difficult).


Re: Accumulating mode implies that panes are processed in order?

2019-06-26 Thread Rui Wang
>
>
>  AFAIK all streaming runners today practically do provide these panes in
> order;
>
Does it refer to "the stage immediately after GBK itself processes fired
panes in order" in streaming runners? Could you share more information?



> this means that likely many users implicitly rely on this "non guarantee,"
> probably without even knowing they are relying on it.
>
If streaming runners have already provided or processed panes in order, and
likely many users rely on it already, why not make order of panes a part of
model explicitly?


-Rui


Re: Spotless exclusions

2019-06-26 Thread Lukasz Cwik
On Wed, Jun 26, 2019 at 4:22 PM Anton Kedin  wrote:

> Currently our spotless is configured globally [1] (for java at least) to
> include all source files by '**/*.java'. And then we exclude things
> explicitly. Don't know why, but these exclusions are ignored for me
> sometimes, for example `./gradlew :sdks:java:core:spotlessJavaCheck` always
> fails when checking the generated files under
> `.../build/generated-src/antlr/main/org/apache/beam/sdk/schemas/parser/generated`.
>
> Few questions:
>  * can someone point me to a discussion or a jira about this behavior?
>

BEAM-6399 and BEAM-7366 allude to something wonky going on.


>  * do we actually have a use case of checking the source files that are
> not under 'src'?
>

No


>  * if not, can we switch the config to only check for sources under 'src'
> [2]?
>

Yes


>  * alternatively, would it make sense to introduce project-specific
> overrides?
>

All src should be under src/ so it is unlikely to be useful.


>
> [1]
> https://github.com/apache/beam/blob/af9362168606df9ec11319fe706b72466413798c/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy#L819
> [2] https://github.com/apache/beam/pull/8954
>


Spotless exclusions

2019-06-26 Thread Anton Kedin
Currently our spotless is configured globally [1] (for java at least) to
include all source files by '**/*.java'. And then we exclude things
explicitly. Don't know why, but these exclusions are ignored for me
sometimes, for example `./gradlew :sdks:java:core:spotlessJavaCheck` always
fails when checking the generated files under
`.../build/generated-src/antlr/main/org/apache/beam/sdk/schemas/parser/generated`.

Few questions:
 * can someone point me to a discussion or a jira about this behavior?
 * do we actually have a use case of checking the source files that are not
under 'src'?
 * if not, can we switch the config to only check for sources under 'src'
[2]?
 * alternatively, would it make sense to introduce project-specific
overrides?

[1]
https://github.com/apache/beam/blob/af9362168606df9ec11319fe706b72466413798c/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy#L819
[2] https://github.com/apache/beam/pull/8954


Re: Accumulating mode implies that panes are processed in order?

2019-06-26 Thread Reuven Lax
Correct, however I think our triggering model is close to useless (or at
least close to unusable) without such a guarantee, for both accumulating
and discarding. What's worse - AFAIK all streaming runners today
practically do provide these panes in order; this means that likely many
users implicitly rely on this "non guarantee," probably without even
knowing they are relying on it.

On Wed, Jun 26, 2019 at 10:02 PM Robert Bradshaw 
wrote:

> There is no promise that panes will arrive in order (especially the
> further you get "downstream"). Though they may be approximately so,
> it's dangerous to assume that. You can inspect the sequential index in
> PaneInfo to determine whether a pane is older than other panes you
> have seen.
>
> On Wed, Jun 26, 2019 at 7:03 PM Rui Wang  wrote:
> >
> > Hi Community,
> >
> > I am trying to understand Beam model and having a question related to
> accumulating mode and panes:
> >
> > Accumulating mode means every time when a trigger fires, it emits all
> values seen so far in a window(so it's called accumulating), an example
> from Beam programming model guide[1] sets a repeating order has a repeating
> trigger that fires every time 3 elements arrive on a 10-min fixed windowing
> with the following emitted results:
> >
> >   First trigger firing:  [5, 8, 3]
> >   Second trigger firing: [5, 8, 3, 15, 19, 23]
> >   Third trigger firing:  [5, 8, 3, 15, 19, 23, 9, 13, 10]
> >
> >
> > The original dataflow paper[2] also mentions that accumulating mode is
> useful to downstream consumers to overwrite old value with new value.
> >
> > In order to help such "overwriting" use case, seems to me that Beam
> model provides:
> > 1. triggers fire in order. In the example above, second trigger firing
> should after first trigger firing such that downstream transforms should
> see [5, 8, 3] before [5, 8, 3, 15, 19, 23].
> > 2. downstream transforms execute panes in order. If this is not true, it
> might end with that new values(from later panes) are overwritten by old
> values(earlier panes)
> >
> > Do I have a correct understanding?
> >
> >
> > Thanks,
> > Rui
> >
> >
> >
> > [1]:
> https://beam.apache.org/documentation/programming-guide/#setting-a-trigger
> > [2]: https://www.vldb.org/pvldb/vol8/p1792-Akidau.pdf
>


Re: Accumulating mode implies that panes are processed in order?

2019-06-26 Thread Rui Wang
Thanks! That thread was really helpful!

-Rui

On Wed, Jun 26, 2019 at 1:18 PM Steve Niemitz  wrote:

> There was a thread about this a few months ago as well:
>
> https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E
>
>
> On Wed, Jun 26, 2019 at 4:02 PM Robert Bradshaw 
> wrote:
>
>> There is no promise that panes will arrive in order (especially the
>> further you get "downstream"). Though they may be approximately so,
>> it's dangerous to assume that. You can inspect the sequential index in
>> PaneInfo to determine whether a pane is older than other panes you
>> have seen.
>>
>> On Wed, Jun 26, 2019 at 7:03 PM Rui Wang  wrote:
>> >
>> > Hi Community,
>> >
>> > I am trying to understand Beam model and having a question related to
>> accumulating mode and panes:
>> >
>> > Accumulating mode means every time when a trigger fires, it emits all
>> values seen so far in a window(so it's called accumulating), an example
>> from Beam programming model guide[1] sets a repeating order has a repeating
>> trigger that fires every time 3 elements arrive on a 10-min fixed windowing
>> with the following emitted results:
>> >
>> >   First trigger firing:  [5, 8, 3]
>> >   Second trigger firing: [5, 8, 3, 15, 19, 23]
>> >   Third trigger firing:  [5, 8, 3, 15, 19, 23, 9, 13, 10]
>> >
>> >
>> > The original dataflow paper[2] also mentions that accumulating mode is
>> useful to downstream consumers to overwrite old value with new value.
>> >
>> > In order to help such "overwriting" use case, seems to me that Beam
>> model provides:
>> > 1. triggers fire in order. In the example above, second trigger firing
>> should after first trigger firing such that downstream transforms should
>> see [5, 8, 3] before [5, 8, 3, 15, 19, 23].
>> > 2. downstream transforms execute panes in order. If this is not true,
>> it might end with that new values(from later panes) are overwritten by old
>> values(earlier panes)
>> >
>> > Do I have a correct understanding?
>> >
>> >
>> > Thanks,
>> > Rui
>> >
>> >
>> >
>> > [1]:
>> https://beam.apache.org/documentation/programming-guide/#setting-a-trigger
>> > [2]: https://www.vldb.org/pvldb/vol8/p1792-Akidau.pdf
>>
>


Re: Accumulating mode implies that panes are processed in order?

2019-06-26 Thread Steve Niemitz
There was a thread about this a few months ago as well:
https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E


On Wed, Jun 26, 2019 at 4:02 PM Robert Bradshaw  wrote:

> There is no promise that panes will arrive in order (especially the
> further you get "downstream"). Though they may be approximately so,
> it's dangerous to assume that. You can inspect the sequential index in
> PaneInfo to determine whether a pane is older than other panes you
> have seen.
>
> On Wed, Jun 26, 2019 at 7:03 PM Rui Wang  wrote:
> >
> > Hi Community,
> >
> > I am trying to understand Beam model and having a question related to
> accumulating mode and panes:
> >
> > Accumulating mode means every time when a trigger fires, it emits all
> values seen so far in a window(so it's called accumulating), an example
> from Beam programming model guide[1] sets a repeating order has a repeating
> trigger that fires every time 3 elements arrive on a 10-min fixed windowing
> with the following emitted results:
> >
> >   First trigger firing:  [5, 8, 3]
> >   Second trigger firing: [5, 8, 3, 15, 19, 23]
> >   Third trigger firing:  [5, 8, 3, 15, 19, 23, 9, 13, 10]
> >
> >
> > The original dataflow paper[2] also mentions that accumulating mode is
> useful to downstream consumers to overwrite old value with new value.
> >
> > In order to help such "overwriting" use case, seems to me that Beam
> model provides:
> > 1. triggers fire in order. In the example above, second trigger firing
> should after first trigger firing such that downstream transforms should
> see [5, 8, 3] before [5, 8, 3, 15, 19, 23].
> > 2. downstream transforms execute panes in order. If this is not true, it
> might end with that new values(from later panes) are overwritten by old
> values(earlier panes)
> >
> > Do I have a correct understanding?
> >
> >
> > Thanks,
> > Rui
> >
> >
> >
> > [1]:
> https://beam.apache.org/documentation/programming-guide/#setting-a-trigger
> > [2]: https://www.vldb.org/pvldb/vol8/p1792-Akidau.pdf
>


Re: Accumulating mode implies that panes are processed in order?

2019-06-26 Thread Robert Bradshaw
There is no promise that panes will arrive in order (especially the
further you get "downstream"). Though they may be approximately so,
it's dangerous to assume that. You can inspect the sequential index in
PaneInfo to determine whether a pane is older than other panes you
have seen.

On Wed, Jun 26, 2019 at 7:03 PM Rui Wang  wrote:
>
> Hi Community,
>
> I am trying to understand Beam model and having a question related to 
> accumulating mode and panes:
>
> Accumulating mode means every time when a trigger fires, it emits all values 
> seen so far in a window(so it's called accumulating), an example from Beam 
> programming model guide[1] sets a repeating order has a repeating trigger 
> that fires every time 3 elements arrive on a 10-min fixed windowing with the 
> following emitted results:
>
>   First trigger firing:  [5, 8, 3]
>   Second trigger firing: [5, 8, 3, 15, 19, 23]
>   Third trigger firing:  [5, 8, 3, 15, 19, 23, 9, 13, 10]
>
>
> The original dataflow paper[2] also mentions that accumulating mode is useful 
> to downstream consumers to overwrite old value with new value.
>
> In order to help such "overwriting" use case, seems to me that Beam model 
> provides:
> 1. triggers fire in order. In the example above, second trigger firing should 
> after first trigger firing such that downstream transforms should see [5, 8, 
> 3] before [5, 8, 3, 15, 19, 23].
> 2. downstream transforms execute panes in order. If this is not true, it 
> might end with that new values(from later panes) are overwritten by old 
> values(earlier panes)
>
> Do I have a correct understanding?
>
>
> Thanks,
> Rui
>
>
>
> [1]: 
> https://beam.apache.org/documentation/programming-guide/#setting-a-trigger
> [2]: https://www.vldb.org/pvldb/vol8/p1792-Akidau.pdf


Re: Return types of Write transforms (aka best way to signal)

2019-06-26 Thread Robert Bradshaw
Regarding Python, yes and no. Python doesn't distinguish at compile
time between (1), (2), and (6), but that doesn't mean it isn't part of
the public API and people might start counting on it, so it's in some
sense worse. We can also do (3) (which is less cumbersome in Python,
either returning a tuple or a dict) or (4).

Good point about providing a simple solution (something that can be
waited on at least) and allowing for with* modifiers to return more.

On Wed, Jun 26, 2019 at 7:08 PM Chamikara Jayalath  wrote:
>
> BTW regarding Python SDK, I think the answer to this question is simpler for 
> Python SDK due to the lack of types. Most examples I know just return a 
> PCollection from the Write transform which may or may not be ignored by 
> users. If the PCollection is used, the user should be aware of the element 
> type of the returned PCollection and should use it accordingly in subsequent 
> transforms.
>
> Thanks,
> Cham
>
> On Wed, Jun 26, 2019 at 9:57 AM Chamikara Jayalath  
> wrote:
>>
>>
>>
>> On Wed, Jun 26, 2019 at 5:46 AM Robert Bradshaw  wrote:
>>>
>>> Good question.
>>>
>>> I'm not sure what could be done with (5) if it contains no deferred
>>> objects (e.g there's nothing to wait on).
>>>
>>> There is also (6) return PCollection. The
>>> advantage of (2) is that one can migrate to (1) or (6) without
>>> changing the public API, while giving something to wait on without
>>> promising anything about its contents.
>>>
>>>
>>> I would probably lean towards (4) for anything that would want to
>>> return multiple signals/outputs (e.g. successful vs. failed writes)
>>> and view (3) as being a "cheap" but more cumbersome for the user way
>>> of writing (4). In both cases, more information can be added in a
>>> forward-compatible way. Technically (4) could extend (3) if one wants
>>> to migrate from (3) to (4) to provide a nicer API in the future. (As
>>> an aside, it would be interesting if any of the schema work that lets
>>> us get rid of tuple tags for elements (e.g. join operations) could let
>>> us get rid of tuple tags for PCollectionTuples (e.g. letting a POJO
>>> with PCollection members be as powerful as a PCollectionTuple).
>>>
>>> On Wed, Jun 26, 2019 at 2:23 PM Ismaël Mejía  wrote:
>>> >
>>> > Beam introduced in version 2.4.0 the Wait transform to delay
>>> > processing of each window in a PCollection until signaled. This opened
>>> > new interesting patterns for example writing to a database and when
>>> > ‘fully’ done write to another database.
>>> >
>>> > To support this pattern an IO connector Write transform must return a
>>> > type different from PDone to signal the processing of the next step.
>>> > Some IOs have already started to implement this return type, but each
>>> > returned type has different pros and cons so I wanted to open the
>>> > discussion on this to see if we could somehow find a common pattern to
>>> > suggest IO authors to follow (Note: It may be the case that there is
>>> > not a pattern that fits certain use cases).
>>> >
>>> > So far the approaches in our code base are:
>>> >
>>> > 1. Write returns ‘PCollection’
>>> >
>>> > This is the simplest case but if subsequent transforms require more
>>> > data that could have been produced during the write it gets ‘lost’.
>>> > Used by JdbcIO and DynamoDBIO.
>>> >
>>> > 2. Write returns ‘PCollection’
>>> >
>>> > We can return whatever we want but the return type is uncertain for
>>> > the user in case he wants to use information from it. This is less
>>> > user friendly but has the maintenance advantage of not changing
>>> > signatures if we want to change the return type in the future. Used by
>>> > RabbitMQIO.
>>> >
>>> > 3. Write returns a `PCollectionTuple`
>>> >
>>> > It is like (2) but with the advantage of returning an untyped tuple of
>>> > PCollections so we can return more things. Used by SnsIO.
>>> >
>>> > 4. Write returns ‘a class that implements POutput’
>>> >
>>> > This class wraps inside of the PCollections that were part of the
>>> > write, e.g. SpannerWriteResult. This is useful because we can be
>>> > interested on saving inside a PCollection of failed mutations apart of
>>> > the ‘done’ signal. Used by BigQueryIO and SpannerIO. A generics case
>>> > of this one is used by FileIO for Destinations via:
>>> > ‘WriteFilesResult’.
>>> >
>>> > 5. Write returns ‘a class that implements POutput’ with specific data
>>> > (no PCollections)
>>> >
>>> > This is similar to (4) but with the difference that the returned type
>>> > contains the specific data that may be needed next, for example not a
>>> > PCollection but values like the number of rows written. Used by
>>> > BigtableIO (PR in review at the moment). (This can be seen as a
>>> > simpler version of 4).
>>
>>
>> Thanks Ismaël for detailing various approaches with examples.
>>
>> I think current PR for BigTable returns a PCollection  
>> from a PTransform 'WithWriteResults' that can be optionally invoked through 
>> a 

Re: python integration tests flake detection

2019-06-26 Thread Udi Meiri
In lieu of doing a migration to pytest, which is a large effort, I'm trying
to do the same using nose.
Opened https://issues.apache.org/jira/browse/BEAM-7641

On Tue, Jun 25, 2019 at 4:01 PM Udi Meiri  wrote:

> I was thinking that our test infrastructure could use an upgrade to pytest.
>
> Some advantages:
> - It'd allow setting the test suite name. For example, if you look at this
> page
> 
>  you'll
> find 3 sets of 4 identically named tests with no way to tell which tox
> environment they were run on (all marked as "nosetests").
> - It will hopefully allow a degree of parallelism (if we can solve some
> pickling errors). This will make running unit tests locally much faster.
> - pytest has cleaner progress reporting
> - no more BeamTestPlugin
> - easier inclusion/exclusion of tests (using markers such as: precommit,
> postcommit, no_direct, no_dataflow, etc.)
>
>
> On Tue, Jun 25, 2019 at 10:50 AM Udi Meiri  wrote:
>
>> Yes. It only outputs to one filename though, so it'd need some working
>> around (our ITs might have more than one nose run).
>> Some tests run in docker, so that might need work to get the xml out.
>>
>> On Tue, Jun 25, 2019 at 10:11 AM Ahmet Altay  wrote:
>>
>>> There is a nose plugin [1] for outputting test results in xunit format.
>>> Would that work?
>>>
>>> [1] https://nose.readthedocs.io/en/latest/plugins/xunit.html
>>>
>>> On Tue, Jun 25, 2019 at 10:04 AM Udi Meiri  wrote:
>>>
 The current state of Python post-commit tests is pretty flaky.
 I was wondering if we had any stats for integration tests, to help
 identify which tests are causing the most failures. Jenkins keeps some
 history for tests (example
 ),
 but it requires junit-style .xml output.

 Would it be possible to get our integration test results into Jenkins?

>>>


smime.p7s
Description: S/MIME Cryptographic Signature


Re: Using Grafana to display test metrics and alert anomalies

2019-06-26 Thread Mikhail Gryzykhin
Hi Łukasz,

See answers inline.

Regard,
Mikhail.

On Wed, Jun 26, 2019 at 7:47 AM Łukasz Gajowy  wrote:

> Hi Mikhail!
>
> Together with Kamil we're investigating the possibilities of creating
> alerts for anomalies for the metrics collected from various tests (load, IO
> tests, other performance tests). This is unfortunately impossible to do in
> Perfkit explorer tool that we're using for displaying the metrics right now
> [1]. Therefore we're considering a switch to some other solution.
>

> This is why we'd like to ask you some questions about the Community
> Metrics tool. It is set up using Grafana that has the alerting feature out
> of the box so it is a natural candidate. Moreover, it let's keep the
> infrastructure as code which is also a big plus. Unfortunately alerting
> feature does not work with BigQuery as a data source for Grafana [2].
>
I'd say it is worth looking into. Keeping most of metrics in one place is
much more convenient than have multiple tools in different locations.

>
> The questions:
>
>1. What do you think of adding test related metrics as separate
>dashboards in the existing Grafana instance?
>
> This should be completely fine and I don't see any blockers.

>
>1. We were thinking of setting up a cloud SQL Postgres instance for
>storing test metrics and reference this source in our Grafana dashboards.
>Won't this approach collide in any way with existing setup?
>
> You can reuse existing PSQL DB that is utilized by Grafana. It's already
hosted in GCP and should be available for you to use. Some permissions
configuration might need configuring though. Even if you utilize separate
DB, Grafana supports multiple data sources, so there should be no issues.

>
>1. Have you tried setting up alerts in Grafana for community metrics?
>Do you expect any blockers there?
>
> We do not have email configured in Grafana, however we utilize alerts on
freshness dashboard that are later checked by metrics prober job

.


> I also CCed the devlist for visibility and comments (if any).
>
> Thanks!
> Łukasz
>
> [1] https://s.apache.org/io-test-dashboards
> [2] https://github.com/doitintl/bigquery-grafana/issues/67
>


Re: Return types of Write transforms (aka best way to signal)

2019-06-26 Thread Chamikara Jayalath
BTW regarding Python SDK, I think the answer to this question is simpler
for Python SDK due to the lack of types. Most examples I know just return a
PCollection from the Write transform which may or may not be ignored by
users. If the PCollection is used, the user should be aware of the element
type of the returned PCollection and should use it accordingly in
subsequent transforms.

Thanks,
Cham

On Wed, Jun 26, 2019 at 9:57 AM Chamikara Jayalath 
wrote:

>
>
> On Wed, Jun 26, 2019 at 5:46 AM Robert Bradshaw 
> wrote:
>
>> Good question.
>>
>> I'm not sure what could be done with (5) if it contains no deferred
>> objects (e.g there's nothing to wait on).
>>
>> There is also (6) return PCollection. The
>> advantage of (2) is that one can migrate to (1) or (6) without
>> changing the public API, while giving something to wait on without
>> promising anything about its contents.
>
>
>> I would probably lean towards (4) for anything that would want to
>> return multiple signals/outputs (e.g. successful vs. failed writes)
>> and view (3) as being a "cheap" but more cumbersome for the user way
>> of writing (4). In both cases, more information can be added in a
>> forward-compatible way. Technically (4) could extend (3) if one wants
>> to migrate from (3) to (4) to provide a nicer API in the future. (As
>> an aside, it would be interesting if any of the schema work that lets
>> us get rid of tuple tags for elements (e.g. join operations) could let
>> us get rid of tuple tags for PCollectionTuples (e.g. letting a POJO
>> with PCollection members be as powerful as a PCollectionTuple).
>>
>> On Wed, Jun 26, 2019 at 2:23 PM Ismaël Mejía  wrote:
>> >
>> > Beam introduced in version 2.4.0 the Wait transform to delay
>> > processing of each window in a PCollection until signaled. This opened
>> > new interesting patterns for example writing to a database and when
>> > ‘fully’ done write to another database.
>> >
>> > To support this pattern an IO connector Write transform must return a
>> > type different from PDone to signal the processing of the next step.
>> > Some IOs have already started to implement this return type, but each
>> > returned type has different pros and cons so I wanted to open the
>> > discussion on this to see if we could somehow find a common pattern to
>> > suggest IO authors to follow (Note: It may be the case that there is
>> > not a pattern that fits certain use cases).
>> >
>> > So far the approaches in our code base are:
>> >
>> > 1. Write returns ‘PCollection’
>> >
>> > This is the simplest case but if subsequent transforms require more
>> > data that could have been produced during the write it gets ‘lost’.
>> > Used by JdbcIO and DynamoDBIO.
>> >
>> > 2. Write returns ‘PCollection’
>> >
>> > We can return whatever we want but the return type is uncertain for
>> > the user in case he wants to use information from it. This is less
>> > user friendly but has the maintenance advantage of not changing
>> > signatures if we want to change the return type in the future. Used by
>> > RabbitMQIO.
>> >
>> > 3. Write returns a `PCollectionTuple`
>> >
>> > It is like (2) but with the advantage of returning an untyped tuple of
>> > PCollections so we can return more things. Used by SnsIO.
>> >
>> > 4. Write returns ‘a class that implements POutput’
>> >
>> > This class wraps inside of the PCollections that were part of the
>> > write, e.g. SpannerWriteResult. This is useful because we can be
>> > interested on saving inside a PCollection of failed mutations apart of
>> > the ‘done’ signal. Used by BigQueryIO and SpannerIO. A generics case
>> > of this one is used by FileIO for Destinations via:
>> > ‘WriteFilesResult’.
>> >
>> > 5. Write returns ‘a class that implements POutput’ with specific data
>> > (no PCollections)
>> >
>> > This is similar to (4) but with the difference that the returned type
>> > contains the specific data that may be needed next, for example not a
>> > PCollection but values like the number of rows written. Used by
>> > BigtableIO (PR in review at the moment). (This can be seen as a
>> > simpler version of 4).
>>
>
> Thanks Ismaël for detailing various approaches with examples.
>
> I think current PR for BigTable returns a
> PCollection  from a PTransform 'WithWriteResults' that
> can be optionally invoked through a BigTableIO.Write.withWriteResults(). So
> this is more closer to (6) Robert mentioned. But (1) was also discussed as
> an option. PR is https://github.com/apache/beam/pull/7805 for anybody
> interested.
>
> I think (6) is less cumbersome to implement/use and allows us to easily
> extend the transform through more chaining or by changing the return
> transform through additional "with*" methods to the FooIO.Write class.
>
> Thanks,
> Cham
>
> >
>> > I would like to have your opinions on which approach you think it is
>> > better or worse and arguments if you see other
>> > advantages/disadvantages. I am probably more in the (4) camp but I
>> > feel somehow 

Accumulating mode implies that panes are processed in order?

2019-06-26 Thread Rui Wang
Hi Community,

I am trying to understand Beam model and having a question related to
accumulating mode and panes:

Accumulating mode means every time when a trigger fires, it emits all
values seen so far in a window(so it's called accumulating), an example
from Beam programming model guide[1] sets a repeating order has a repeating
trigger that fires every time 3 elements arrive on a 10-min fixed windowing
with the following emitted results:

  First trigger firing:  [5, 8, 3]
  Second trigger firing: [5, 8, 3, 15, 19, 23]
  Third trigger firing:  [5, 8, 3, 15, 19, 23, 9, 13, 10]


The original dataflow paper[2] also mentions that accumulating mode is
useful to downstream consumers to overwrite old value with new value.

In order to help such "overwriting" use case, seems to me that Beam model
provides:
1. triggers fire in order. In the example above, second trigger firing
should after first trigger firing such that downstream transforms should
see [5, 8, 3] before [5, 8, 3, 15, 19, 23].
2. downstream transforms execute panes in order. If this is not true, it
might end with that new values(from later panes) are overwritten by old
values(earlier panes)

Do I have a correct understanding?


Thanks,
Rui



[1]:
https://beam.apache.org/documentation/programming-guide/#setting-a-trigger
[2]: https://www.vldb.org/pvldb/vol8/p1792-Akidau.pdf


Re: Return types of Write transforms (aka best way to signal)

2019-06-26 Thread Chamikara Jayalath
On Wed, Jun 26, 2019 at 5:46 AM Robert Bradshaw  wrote:

> Good question.
>
> I'm not sure what could be done with (5) if it contains no deferred
> objects (e.g there's nothing to wait on).
>
> There is also (6) return PCollection. The
> advantage of (2) is that one can migrate to (1) or (6) without
> changing the public API, while giving something to wait on without
> promising anything about its contents.


> I would probably lean towards (4) for anything that would want to
> return multiple signals/outputs (e.g. successful vs. failed writes)
> and view (3) as being a "cheap" but more cumbersome for the user way
> of writing (4). In both cases, more information can be added in a
> forward-compatible way. Technically (4) could extend (3) if one wants
> to migrate from (3) to (4) to provide a nicer API in the future. (As
> an aside, it would be interesting if any of the schema work that lets
> us get rid of tuple tags for elements (e.g. join operations) could let
> us get rid of tuple tags for PCollectionTuples (e.g. letting a POJO
> with PCollection members be as powerful as a PCollectionTuple).
>
> On Wed, Jun 26, 2019 at 2:23 PM Ismaël Mejía  wrote:
> >
> > Beam introduced in version 2.4.0 the Wait transform to delay
> > processing of each window in a PCollection until signaled. This opened
> > new interesting patterns for example writing to a database and when
> > ‘fully’ done write to another database.
> >
> > To support this pattern an IO connector Write transform must return a
> > type different from PDone to signal the processing of the next step.
> > Some IOs have already started to implement this return type, but each
> > returned type has different pros and cons so I wanted to open the
> > discussion on this to see if we could somehow find a common pattern to
> > suggest IO authors to follow (Note: It may be the case that there is
> > not a pattern that fits certain use cases).
> >
> > So far the approaches in our code base are:
> >
> > 1. Write returns ‘PCollection’
> >
> > This is the simplest case but if subsequent transforms require more
> > data that could have been produced during the write it gets ‘lost’.
> > Used by JdbcIO and DynamoDBIO.
> >
> > 2. Write returns ‘PCollection’
> >
> > We can return whatever we want but the return type is uncertain for
> > the user in case he wants to use information from it. This is less
> > user friendly but has the maintenance advantage of not changing
> > signatures if we want to change the return type in the future. Used by
> > RabbitMQIO.
> >
> > 3. Write returns a `PCollectionTuple`
> >
> > It is like (2) but with the advantage of returning an untyped tuple of
> > PCollections so we can return more things. Used by SnsIO.
> >
> > 4. Write returns ‘a class that implements POutput’
> >
> > This class wraps inside of the PCollections that were part of the
> > write, e.g. SpannerWriteResult. This is useful because we can be
> > interested on saving inside a PCollection of failed mutations apart of
> > the ‘done’ signal. Used by BigQueryIO and SpannerIO. A generics case
> > of this one is used by FileIO for Destinations via:
> > ‘WriteFilesResult’.
> >
> > 5. Write returns ‘a class that implements POutput’ with specific data
> > (no PCollections)
> >
> > This is similar to (4) but with the difference that the returned type
> > contains the specific data that may be needed next, for example not a
> > PCollection but values like the number of rows written. Used by
> > BigtableIO (PR in review at the moment). (This can be seen as a
> > simpler version of 4).
>

Thanks Ismaël for detailing various approaches with examples.

I think current PR for BigTable returns a PCollection
from a PTransform 'WithWriteResults' that can be optionally invoked through
a BigTableIO.Write.withWriteResults(). So this is more closer to (6) Robert
mentioned. But (1) was also discussed as an option. PR is
https://github.com/apache/beam/pull/7805 for anybody interested.

I think (6) is less cumbersome to implement/use and allows us to easily
extend the transform through more chaining or by changing the return
transform through additional "with*" methods to the FooIO.Write class.

Thanks,
Cham

>
> > I would like to have your opinions on which approach you think it is
> > better or worse and arguments if you see other
> > advantages/disadvantages. I am probably more in the (4) camp but I
> > feel somehow attracted by the flexibility that the lack of strict
> > typing brings in (2, 3) in case of changes to the public IO API (of
> > course this can be contested too).
> >
> > Any other ideas, preferences, issues we may be missing?
>


Using Grafana to display test metrics and alert anomalies

2019-06-26 Thread Łukasz Gajowy
Hi Mikhail!

Together with Kamil we're investigating the possibilities of creating
alerts for anomalies for the metrics collected from various tests (load, IO
tests, other performance tests). This is unfortunately impossible to do in
Perfkit explorer tool that we're using for displaying the metrics right now
[1]. Therefore we're considering a switch to some other solution.

This is why we'd like to ask you some questions about the Community Metrics
tool. It is set up using Grafana that has the alerting feature out of the
box so it is a natural candidate. Moreover, it let's keep the
infrastructure as code which is also a big plus. Unfortunately alerting
feature does not work with BigQuery as a data source for Grafana [2].

The questions:

   1. What do you think of adding test related metrics as separate
   dashboards in the existing Grafana instance?
   2. We were thinking of setting up a cloud SQL Postgres instance for
   storing test metrics and reference this source in our Grafana dashboards.
   Won't this approach collide in any way with existing setup?
   3. Have you tried setting up alerts in Grafana for community metrics? Do
   you expect any blockers there?

I also CCed the devlist for visibility and comments (if any).

Thanks!
Łukasz

[1] https://s.apache.org/io-test-dashboards
[2] https://github.com/doitintl/bigquery-grafana/issues/67


Re: Plan for dropping python 2 support

2019-06-26 Thread Robert Bradshaw
On Sat, Jun 22, 2019 at 1:09 AM Valentyn Tymofieiev  wrote:
>
> On Tue, Jun 18, 2019 at 2:01 PM Ahmet Altay  wrote:
>>
>> Thank you for the update, very helpful. It might be worthwhile to share a 
>> version of this with user mailing list after 2.14.
>
>
> I think so too, we  can send an update to user list when 2.14.0 is released.
>
>>
>> Remaining question for me is: There is no plan for an LTS release currently. 
>> Would it make sense for us to target one after known remaining issues are 
>> mostly fixed. What release would that be?
>
>
> From Python 3 standpoint, 2.16.0 could be a good target for an LTS release, 
> that would give us two more releases in 2019 where we can mark Python 2 
> support as deprecated and remove it with the first release in 2020.

Our LTS policy (such as it is) is to designate them after they've
proven themselves, not ahead of time. That being said, support for 3.x
is a good criteria for a candidate to be declared as the LTS one.

>> On Tue, Jun 18, 2019 at 12:08 AM Valentyn Tymofieiev  
>> wrote:
>>>
>>> To give a better understanding where we are w.r.t. Python 3,  I'd like to 
>>> give a quick overview of the recent work that has been happening in Beam 
>>> community to support Python 3, and to summarize the current status of this 
>>> effort.
>>>
>>> Current status:
>>>
>>> Beam 2.11.0 was the first release that offered Python 3 support, 
>>> specifically Python 3.5 support. Due to several limitations that have been 
>>> fixed since 2.11.0, Beam 2.13.0 (or newer version) is recommended for 
>>> Python 3 pipelines.
>>>
>>> Pipelines running on Portable Flink / Spark runners may have to use Beam 
>>> 2.14.0 once it becomes available.
>>>
>>> Python 3.5 or newer version of the interpreter is required to install Beam 
>>> and run Python 3 pipelines.
>>>
>>>
>>> Known remaining limitations of current Python 3 offering:
>>>
>>> Several syntactic constructs introduced in Python 3 (keyword-only 
>>> arguments, dataclasses), are not yet supported. See: BEAM-5878, BEAM-7284.
>>>
>>> Pickling errors occasionally prevent usage of --save_main_session flag, but 
>>> changes to the pipeline code may help to overcome this limitation. See: 
>>> BEAM-6158, BEAM-7540
>>>
>>> Beam has limited type inference capabilities support in Python 3.6+, and 
>>> type checking of Beam typehints is not always enforced, see: BEAM-2713, 
>>> BEAM-7377.
>>>
>>>
>>> The cause of limitations 1-2 largely lies in Beam dependency 'dill' that 
>>> supports pickling. In the immediate future we will be working on evaluating 
>>> replacements or/and fixes to address this. We are also working on an 
>>> improved typehints support in Python 3, see: BEAM-2713.
>>>
>>>
>>> The efforts to make Beam codebase Python3-compatible started back in 2017. 
>>> Most of this work is visible in BEAM-1251[1] and in Kanban Board [2].
>>>
>>>
>>> 2017:
>>>
>>> BEAM-1251 is opened, and first efforts to make Beam codebase 
>>> Python3-compatible followed shortly.
>>>
>>>
>>> Q3-Q4 2019:
>>>
>>> Active work on "futurizing" Beam codebase piece-by-piece while preventing 
>>> regressions in performance in existing Python 2 offering.
>>>
>>> Building test infrastructure to incorporate Python 3 test scenarios.
>>>
>>>
>>> Apache Beam 2.11.0 (Q1 2019):
>>>
>>> "Futurization" of Beam Python codebase completed.
>>>
>>> Apache Beam 2.11.0 is released with Python 3 support, with limitations.
>>>
>>> Continuous pre-commit and post-commit test suites added for Python 3.5.
>>>
>>> Gaps in Python 3 support in Datastore IO, Avro IO, Bigquery IO identified 
>>> and scoped.
>>>
>>> Continuous testing mostly limited to Python 3.5.
>>>
>>>
>>> Apache Beam 2.12.0 (Q2 2019):
>>>
>>> Pre and Post-commit test coverage expanded to Python 3.5, 3.6, 3.7.
>>>
>>> Direct and Dataflow runners added support for Python 3.6 - 3.7.
>>>
>>>
>>> Apache Beam 2.13.0 (Q2 2019)
>>>
>>> Avro IO support enabled on Python 3.
>>>
>>> Datastore IO support enabled on Python 3.
>>>
>>> Bigquery IO support for BYTES datatype enabled on Python 3.
>>>
>>>
>>> Apache Beam 2.14.0 (to be released in Q3 2019)
>>>
>>> Python 3 bug fixes for Bigquery IO and Portable Runner
>>>
>>> Every Python SDK commit exercises Direct, Dataflow, and Portable Flink 
>>> runners on Python 3 in various test suites.
>>>
>>> Beam 2.14.0 will declare Python 3.5, 3.6, 3.7 support in PyPi.
>>>
>>>
>>> Next steps:
>>>
>>> Address known limitations and user feedback.
>>>
>>> Increase Python 3 test coverage in portable runner.
>>>
>>> Assist Beam users in Python 2 -> Python 3 migration.
>>>
>>> Deprecate of Python 2 support in Beam, cleanup the codebase.
>>>
>>>
>>> I'd like to thank all Beam contributors who have been helping to push this 
>>> effort so far.
>>>
>>>
>>> [1] https://issues.apache.org/jira/browse/BEAM-1251
>>>
>>> [2] 
>>> https://issues.apache.org/jira/secure/RapidBoard.jspa?rapidView=245=detail
>>>
>>>
>>> On Tue, Jun 18, 2019 at 12:03 AM Valentyn Tymofieiev  
>>> wrote:

 I like the 

Re: [DISCUSS] Solving timer ordering on immutable bundles

2019-06-26 Thread Reuven Lax
Earlier than the input watermark only applies to event time timers, but the
above problem holds for processing time timers as well.

On Wed, Jun 26, 2019, 1:50 PM Robert Bradshaw  wrote:

> Yeah, it wouldn't be optimal performance-wise, but I think it's good
> to keep the bar for a correct SDK low. Might still be better than
> sending one timer per bundle, and you only pay the performance if
> timers are set earlier than the input watermark (and there was a timer
> firing in this range). (How often this happens probably varies a lot
> in practice.)
>
> On Wed, Jun 26, 2019 at 2:33 PM Reuven Lax  wrote:
> >
> > This would have a lot of performance problems (especially since there is
> user code that caches within a bundle, and invalidates the cache at the end
> of every bundle). However this would be a valid "lazy" implementation.
> >
> > On Wed, Jun 26, 2019 at 2:29 PM Robert Bradshaw 
> wrote:
> >>
> >> Note also that a "lazy" SDK implementation would be to simply return
> >> all the timers (as if they were new timers) to runner once a timer set
> >> (before or at the last requested timer in the bundle) is encountered.
> >> E.g. Suppose we had timers T1, T3, T5 in the bundle. On firing T1, we
> >> set T2 and delete T3. The SDK could then claim that a timers were
> >> (again) set at T3, T5, then set one at at T2 and deleted at T3 and
> >> then be done with the bundle (not actually process T3 and T5). (One
> >> way to think about this is that timers are actually bundle splits into
> >> a bundle of "done" and "future" work.) A more intelligent SDK could,
> >> of course, process the whole bundle by tracking modifications to the
> >> to-be-fired timers itself rather than requiring a trip through the
> >> runner.
> >>
> >> On Wed, Jun 26, 2019 at 1:51 PM Reuven Lax  wrote:
> >> >
> >> > I like this option the best. It might be trickier to implement, but
> seems like it would be the most consistent solution.
> >> >
> >> > Another problem it would solve is the following: let's say a bundle
> arrives containing timers T1 and T2, and while processing T1 the user code
> deletes T2 (or resets it to a time in the far future). I'm actually not
> sure what we do today, but I'm a bit afraid that we will go ahead and fire
> T2 since it's already in the bundle, which is clearly incorrect. The SDK
> needs to keep track of this and skip T2 in order to solve this, which is
> the same sort of work needed to implement Robert's suggestion.
> >> >
> >> > Reuven
> >> >
> >> > On Wed, Jun 26, 2019 at 12:28 PM Robert Bradshaw 
> wrote:
> >> >>
> >> >> Another option, that is nice from an API perspective but places a
> >> >> burden on SDK implementers (and possibly runners), is to maintain the
> >> >> ordering of timers by requiring timers to be fired in order, and if
> >> >> any timers are set to fire them immediately before processing later
> >> >> timers. In other words, if T1 sets T2 and modifies T3, these would
> >> >> take effect (locally, the runner may not even know about T2) before
> T3
> >> >> was processed.
> >> >>
> >> >> On Wed, Jun 26, 2019 at 11:13 AM Jan Lukavský 
> wrote:
> >> >> >
> >> >> > Hi,
> >> >> >
> >> >> > I have mentioned an issue I have come across [1] on several other
> >> >> > threads, but it probably didn't attract the attention that it
> would desire.
> >> >> >
> >> >> > I will try to restate the problem here for clarity:
> >> >> >
> >> >> >   - on runners that use concept of bundles (the original issue
> mentions
> >> >> > DirectRunner, but it will probably apply for other runners, which
> use
> >> >> > bundles, as well), the workflow is as follows:
> >> >> >
> >> >> >a) process elements in bundle
> >> >> >
> >> >> >b) advance watermark
> >> >> >
> >> >> >c) process timers
> >> >> >
> >> >> >d) continue to next bundle
> >> >> >
> >> >> >   - the issue with this is that when we are initially at time T0,
> set
> >> >> > two timers for T1 and T3, then advance watermark to T3 (or
> beyond), the
> >> >> > timers will fire (correctly) in order T1, T3, but if timer at T1
> sets
> >> >> > another timer for T2, then this timer will be fired in next bundle
> (and
> >> >> > therefore after T3)
> >> >> >
> >> >> >   - this causes issues mostly with race conditions in window GC
> timers
> >> >> > and user timers (and users don't have any way to solve that!)
> >> >> >
> >> >> >   - note that the same applies when one timer tries to reset timer
> that
> >> >> > is already in the current bundle
> >> >> >
> >> >> > I have investigated a way of solving this by running timers only
> for
> >> >> > single timestamp (instant) at each bundle, but as Reuven pointed
> out,
> >> >> > that could regress performance (mostly by delaying firing of
> timers,
> >> >> > that could have fired). Options I see:
> >> >> >
> >> >> >   1) either set the OnTimerContext#timestamp() to current input
> >> >> > watermark (not the time that user actually set the timer), or
> >> >> >
> >> >> >   2) add 

Re: [DISCUSS] Solving timer ordering on immutable bundles

2019-06-26 Thread Jan Lukavský
That sounds very interesting. I will update the JIRA with link to this 
discussion and then I will have a look if this can be easily implemented 
in the DirectRunner.


Thanks for this discussion!

On 6/26/19 2:50 PM, Robert Bradshaw wrote:

Yeah, it wouldn't be optimal performance-wise, but I think it's good
to keep the bar for a correct SDK low. Might still be better than
sending one timer per bundle, and you only pay the performance if
timers are set earlier than the input watermark (and there was a timer
firing in this range). (How often this happens probably varies a lot
in practice.)

On Wed, Jun 26, 2019 at 2:33 PM Reuven Lax  wrote:

This would have a lot of performance problems (especially since there is user code that 
caches within a bundle, and invalidates the cache at the end of every bundle). However 
this would be a valid "lazy" implementation.

On Wed, Jun 26, 2019 at 2:29 PM Robert Bradshaw  wrote:

Note also that a "lazy" SDK implementation would be to simply return
all the timers (as if they were new timers) to runner once a timer set
(before or at the last requested timer in the bundle) is encountered.
E.g. Suppose we had timers T1, T3, T5 in the bundle. On firing T1, we
set T2 and delete T3. The SDK could then claim that a timers were
(again) set at T3, T5, then set one at at T2 and deleted at T3 and
then be done with the bundle (not actually process T3 and T5). (One
way to think about this is that timers are actually bundle splits into
a bundle of "done" and "future" work.) A more intelligent SDK could,
of course, process the whole bundle by tracking modifications to the
to-be-fired timers itself rather than requiring a trip through the
runner.

On Wed, Jun 26, 2019 at 1:51 PM Reuven Lax  wrote:

I like this option the best. It might be trickier to implement, but seems like 
it would be the most consistent solution.

Another problem it would solve is the following: let's say a bundle arrives 
containing timers T1 and T2, and while processing T1 the user code deletes T2 
(or resets it to a time in the far future). I'm actually not sure what we do 
today, but I'm a bit afraid that we will go ahead and fire T2 since it's 
already in the bundle, which is clearly incorrect. The SDK needs to keep track 
of this and skip T2 in order to solve this, which is the same sort of work 
needed to implement Robert's suggestion.

Reuven

On Wed, Jun 26, 2019 at 12:28 PM Robert Bradshaw  wrote:

Another option, that is nice from an API perspective but places a
burden on SDK implementers (and possibly runners), is to maintain the
ordering of timers by requiring timers to be fired in order, and if
any timers are set to fire them immediately before processing later
timers. In other words, if T1 sets T2 and modifies T3, these would
take effect (locally, the runner may not even know about T2) before T3
was processed.

On Wed, Jun 26, 2019 at 11:13 AM Jan Lukavský  wrote:

Hi,

I have mentioned an issue I have come across [1] on several other
threads, but it probably didn't attract the attention that it would desire.

I will try to restate the problem here for clarity:

   - on runners that use concept of bundles (the original issue mentions
DirectRunner, but it will probably apply for other runners, which use
bundles, as well), the workflow is as follows:

a) process elements in bundle

b) advance watermark

c) process timers

d) continue to next bundle

   - the issue with this is that when we are initially at time T0, set
two timers for T1 and T3, then advance watermark to T3 (or beyond), the
timers will fire (correctly) in order T1, T3, but if timer at T1 sets
another timer for T2, then this timer will be fired in next bundle (and
therefore after T3)

   - this causes issues mostly with race conditions in window GC timers
and user timers (and users don't have any way to solve that!)

   - note that the same applies when one timer tries to reset timer that
is already in the current bundle

I have investigated a way of solving this by running timers only for
single timestamp (instant) at each bundle, but as Reuven pointed out,
that could regress performance (mostly by delaying firing of timers,
that could have fired). Options I see:

   1) either set the OnTimerContext#timestamp() to current input
watermark (not the time that user actually set the timer), or

   2) add OnTimerContext#getCurrentInputWatermark() and disallow setting
(or resetting) timers for time between OnProcessContext#timestamp and
OnProcessContext#getCurrentInputWatermark(), by throwing an exception

   3) any other option?

Option 1) seems to be broken by design, as it can result in corrupt data
(emitted with wrong timestamp, which is even somewhat arbitrary), I'm
including it just for completeness. Option 2) is breaking change, that
can result in PIpeline failures (although the failures will happen on
Pipelines, that are probably already broken).

Although I have come with a workaround in the work where I originally

Re: [DISCUSS] Solving timer ordering on immutable bundles

2019-06-26 Thread Robert Bradshaw
Yeah, it wouldn't be optimal performance-wise, but I think it's good
to keep the bar for a correct SDK low. Might still be better than
sending one timer per bundle, and you only pay the performance if
timers are set earlier than the input watermark (and there was a timer
firing in this range). (How often this happens probably varies a lot
in practice.)

On Wed, Jun 26, 2019 at 2:33 PM Reuven Lax  wrote:
>
> This would have a lot of performance problems (especially since there is user 
> code that caches within a bundle, and invalidates the cache at the end of 
> every bundle). However this would be a valid "lazy" implementation.
>
> On Wed, Jun 26, 2019 at 2:29 PM Robert Bradshaw  wrote:
>>
>> Note also that a "lazy" SDK implementation would be to simply return
>> all the timers (as if they were new timers) to runner once a timer set
>> (before or at the last requested timer in the bundle) is encountered.
>> E.g. Suppose we had timers T1, T3, T5 in the bundle. On firing T1, we
>> set T2 and delete T3. The SDK could then claim that a timers were
>> (again) set at T3, T5, then set one at at T2 and deleted at T3 and
>> then be done with the bundle (not actually process T3 and T5). (One
>> way to think about this is that timers are actually bundle splits into
>> a bundle of "done" and "future" work.) A more intelligent SDK could,
>> of course, process the whole bundle by tracking modifications to the
>> to-be-fired timers itself rather than requiring a trip through the
>> runner.
>>
>> On Wed, Jun 26, 2019 at 1:51 PM Reuven Lax  wrote:
>> >
>> > I like this option the best. It might be trickier to implement, but seems 
>> > like it would be the most consistent solution.
>> >
>> > Another problem it would solve is the following: let's say a bundle 
>> > arrives containing timers T1 and T2, and while processing T1 the user code 
>> > deletes T2 (or resets it to a time in the far future). I'm actually not 
>> > sure what we do today, but I'm a bit afraid that we will go ahead and fire 
>> > T2 since it's already in the bundle, which is clearly incorrect. The SDK 
>> > needs to keep track of this and skip T2 in order to solve this, which is 
>> > the same sort of work needed to implement Robert's suggestion.
>> >
>> > Reuven
>> >
>> > On Wed, Jun 26, 2019 at 12:28 PM Robert Bradshaw  
>> > wrote:
>> >>
>> >> Another option, that is nice from an API perspective but places a
>> >> burden on SDK implementers (and possibly runners), is to maintain the
>> >> ordering of timers by requiring timers to be fired in order, and if
>> >> any timers are set to fire them immediately before processing later
>> >> timers. In other words, if T1 sets T2 and modifies T3, these would
>> >> take effect (locally, the runner may not even know about T2) before T3
>> >> was processed.
>> >>
>> >> On Wed, Jun 26, 2019 at 11:13 AM Jan Lukavský  wrote:
>> >> >
>> >> > Hi,
>> >> >
>> >> > I have mentioned an issue I have come across [1] on several other
>> >> > threads, but it probably didn't attract the attention that it would 
>> >> > desire.
>> >> >
>> >> > I will try to restate the problem here for clarity:
>> >> >
>> >> >   - on runners that use concept of bundles (the original issue mentions
>> >> > DirectRunner, but it will probably apply for other runners, which use
>> >> > bundles, as well), the workflow is as follows:
>> >> >
>> >> >a) process elements in bundle
>> >> >
>> >> >b) advance watermark
>> >> >
>> >> >c) process timers
>> >> >
>> >> >d) continue to next bundle
>> >> >
>> >> >   - the issue with this is that when we are initially at time T0, set
>> >> > two timers for T1 and T3, then advance watermark to T3 (or beyond), the
>> >> > timers will fire (correctly) in order T1, T3, but if timer at T1 sets
>> >> > another timer for T2, then this timer will be fired in next bundle (and
>> >> > therefore after T3)
>> >> >
>> >> >   - this causes issues mostly with race conditions in window GC timers
>> >> > and user timers (and users don't have any way to solve that!)
>> >> >
>> >> >   - note that the same applies when one timer tries to reset timer that
>> >> > is already in the current bundle
>> >> >
>> >> > I have investigated a way of solving this by running timers only for
>> >> > single timestamp (instant) at each bundle, but as Reuven pointed out,
>> >> > that could regress performance (mostly by delaying firing of timers,
>> >> > that could have fired). Options I see:
>> >> >
>> >> >   1) either set the OnTimerContext#timestamp() to current input
>> >> > watermark (not the time that user actually set the timer), or
>> >> >
>> >> >   2) add OnTimerContext#getCurrentInputWatermark() and disallow setting
>> >> > (or resetting) timers for time between OnProcessContext#timestamp and
>> >> > OnProcessContext#getCurrentInputWatermark(), by throwing an exception
>> >> >
>> >> >   3) any other option?
>> >> >
>> >> > Option 1) seems to be broken by design, as it can result in corrupt data
>> >> > (emitted with 

Re: Return types of Write transforms (aka best way to signal)

2019-06-26 Thread Robert Bradshaw
Good question.

I'm not sure what could be done with (5) if it contains no deferred
objects (e.g there's nothing to wait on).

There is also (6) return PCollection. The
advantage of (2) is that one can migrate to (1) or (6) without
changing the public API, while giving something to wait on without
promising anything about its contents.

I would probably lean towards (4) for anything that would want to
return multiple signals/outputs (e.g. successful vs. failed writes)
and view (3) as being a "cheap" but more cumbersome for the user way
of writing (4). In both cases, more information can be added in a
forward-compatible way. Technically (4) could extend (3) if one wants
to migrate from (3) to (4) to provide a nicer API in the future. (As
an aside, it would be interesting if any of the schema work that lets
us get rid of tuple tags for elements (e.g. join operations) could let
us get rid of tuple tags for PCollectionTuples (e.g. letting a POJO
with PCollection members be as powerful as a PCollectionTuple).

On Wed, Jun 26, 2019 at 2:23 PM Ismaël Mejía  wrote:
>
> Beam introduced in version 2.4.0 the Wait transform to delay
> processing of each window in a PCollection until signaled. This opened
> new interesting patterns for example writing to a database and when
> ‘fully’ done write to another database.
>
> To support this pattern an IO connector Write transform must return a
> type different from PDone to signal the processing of the next step.
> Some IOs have already started to implement this return type, but each
> returned type has different pros and cons so I wanted to open the
> discussion on this to see if we could somehow find a common pattern to
> suggest IO authors to follow (Note: It may be the case that there is
> not a pattern that fits certain use cases).
>
> So far the approaches in our code base are:
>
> 1. Write returns ‘PCollection’
>
> This is the simplest case but if subsequent transforms require more
> data that could have been produced during the write it gets ‘lost’.
> Used by JdbcIO and DynamoDBIO.
>
> 2. Write returns ‘PCollection’
>
> We can return whatever we want but the return type is uncertain for
> the user in case he wants to use information from it. This is less
> user friendly but has the maintenance advantage of not changing
> signatures if we want to change the return type in the future. Used by
> RabbitMQIO.
>
> 3. Write returns a `PCollectionTuple`
>
> It is like (2) but with the advantage of returning an untyped tuple of
> PCollections so we can return more things. Used by SnsIO.
>
> 4. Write returns ‘a class that implements POutput’
>
> This class wraps inside of the PCollections that were part of the
> write, e.g. SpannerWriteResult. This is useful because we can be
> interested on saving inside a PCollection of failed mutations apart of
> the ‘done’ signal. Used by BigQueryIO and SpannerIO. A generics case
> of this one is used by FileIO for Destinations via:
> ‘WriteFilesResult’.
>
> 5. Write returns ‘a class that implements POutput’ with specific data
> (no PCollections)
>
> This is similar to (4) but with the difference that the returned type
> contains the specific data that may be needed next, for example not a
> PCollection but values like the number of rows written. Used by
> BigtableIO (PR in review at the moment). (This can be seen as a
> simpler version of 4).
>
> I would like to have your opinions on which approach you think it is
> better or worse and arguments if you see other
> advantages/disadvantages. I am probably more in the (4) camp but I
> feel somehow attracted by the flexibility that the lack of strict
> typing brings in (2, 3) in case of changes to the public IO API (of
> course this can be contested too).
>
> Any other ideas, preferences, issues we may be missing?


Re: [DISCUSS] Solving timer ordering on immutable bundles

2019-06-26 Thread Reuven Lax
This would have a lot of performance problems (especially since there is
user code that caches within a bundle, and invalidates the cache at the end
of every bundle). However this would be a valid "lazy" implementation.

On Wed, Jun 26, 2019 at 2:29 PM Robert Bradshaw  wrote:

> Note also that a "lazy" SDK implementation would be to simply return
> all the timers (as if they were new timers) to runner once a timer set
> (before or at the last requested timer in the bundle) is encountered.
> E.g. Suppose we had timers T1, T3, T5 in the bundle. On firing T1, we
> set T2 and delete T3. The SDK could then claim that a timers were
> (again) set at T3, T5, then set one at at T2 and deleted at T3 and
> then be done with the bundle (not actually process T3 and T5). (One
> way to think about this is that timers are actually bundle splits into
> a bundle of "done" and "future" work.) A more intelligent SDK could,
> of course, process the whole bundle by tracking modifications to the
> to-be-fired timers itself rather than requiring a trip through the
> runner.
>
> On Wed, Jun 26, 2019 at 1:51 PM Reuven Lax  wrote:
> >
> > I like this option the best. It might be trickier to implement, but
> seems like it would be the most consistent solution.
> >
> > Another problem it would solve is the following: let's say a bundle
> arrives containing timers T1 and T2, and while processing T1 the user code
> deletes T2 (or resets it to a time in the far future). I'm actually not
> sure what we do today, but I'm a bit afraid that we will go ahead and fire
> T2 since it's already in the bundle, which is clearly incorrect. The SDK
> needs to keep track of this and skip T2 in order to solve this, which is
> the same sort of work needed to implement Robert's suggestion.
> >
> > Reuven
> >
> > On Wed, Jun 26, 2019 at 12:28 PM Robert Bradshaw 
> wrote:
> >>
> >> Another option, that is nice from an API perspective but places a
> >> burden on SDK implementers (and possibly runners), is to maintain the
> >> ordering of timers by requiring timers to be fired in order, and if
> >> any timers are set to fire them immediately before processing later
> >> timers. In other words, if T1 sets T2 and modifies T3, these would
> >> take effect (locally, the runner may not even know about T2) before T3
> >> was processed.
> >>
> >> On Wed, Jun 26, 2019 at 11:13 AM Jan Lukavský  wrote:
> >> >
> >> > Hi,
> >> >
> >> > I have mentioned an issue I have come across [1] on several other
> >> > threads, but it probably didn't attract the attention that it would
> desire.
> >> >
> >> > I will try to restate the problem here for clarity:
> >> >
> >> >   - on runners that use concept of bundles (the original issue
> mentions
> >> > DirectRunner, but it will probably apply for other runners, which use
> >> > bundles, as well), the workflow is as follows:
> >> >
> >> >a) process elements in bundle
> >> >
> >> >b) advance watermark
> >> >
> >> >c) process timers
> >> >
> >> >d) continue to next bundle
> >> >
> >> >   - the issue with this is that when we are initially at time T0, set
> >> > two timers for T1 and T3, then advance watermark to T3 (or beyond),
> the
> >> > timers will fire (correctly) in order T1, T3, but if timer at T1 sets
> >> > another timer for T2, then this timer will be fired in next bundle
> (and
> >> > therefore after T3)
> >> >
> >> >   - this causes issues mostly with race conditions in window GC timers
> >> > and user timers (and users don't have any way to solve that!)
> >> >
> >> >   - note that the same applies when one timer tries to reset timer
> that
> >> > is already in the current bundle
> >> >
> >> > I have investigated a way of solving this by running timers only for
> >> > single timestamp (instant) at each bundle, but as Reuven pointed out,
> >> > that could regress performance (mostly by delaying firing of timers,
> >> > that could have fired). Options I see:
> >> >
> >> >   1) either set the OnTimerContext#timestamp() to current input
> >> > watermark (not the time that user actually set the timer), or
> >> >
> >> >   2) add OnTimerContext#getCurrentInputWatermark() and disallow
> setting
> >> > (or resetting) timers for time between OnProcessContext#timestamp and
> >> > OnProcessContext#getCurrentInputWatermark(), by throwing an exception
> >> >
> >> >   3) any other option?
> >> >
> >> > Option 1) seems to be broken by design, as it can result in corrupt
> data
> >> > (emitted with wrong timestamp, which is even somewhat arbitrary), I'm
> >> > including it just for completeness. Option 2) is breaking change, that
> >> > can result in PIpeline failures (although the failures will happen on
> >> > Pipelines, that are probably already broken).
> >> >
> >> > Although I have come with a workaround in the work where I originally
> >> > come across this issue, I think that this is generally serious and
> >> > should be dealt with. Mostly because when using user-facing APIs,
> there
> >> > are no workarounds 

Re: [DISCUSS] Solving timer ordering on immutable bundles

2019-06-26 Thread Robert Bradshaw
Note also that a "lazy" SDK implementation would be to simply return
all the timers (as if they were new timers) to runner once a timer set
(before or at the last requested timer in the bundle) is encountered.
E.g. Suppose we had timers T1, T3, T5 in the bundle. On firing T1, we
set T2 and delete T3. The SDK could then claim that a timers were
(again) set at T3, T5, then set one at at T2 and deleted at T3 and
then be done with the bundle (not actually process T3 and T5). (One
way to think about this is that timers are actually bundle splits into
a bundle of "done" and "future" work.) A more intelligent SDK could,
of course, process the whole bundle by tracking modifications to the
to-be-fired timers itself rather than requiring a trip through the
runner.

On Wed, Jun 26, 2019 at 1:51 PM Reuven Lax  wrote:
>
> I like this option the best. It might be trickier to implement, but seems 
> like it would be the most consistent solution.
>
> Another problem it would solve is the following: let's say a bundle arrives 
> containing timers T1 and T2, and while processing T1 the user code deletes T2 
> (or resets it to a time in the far future). I'm actually not sure what we do 
> today, but I'm a bit afraid that we will go ahead and fire T2 since it's 
> already in the bundle, which is clearly incorrect. The SDK needs to keep 
> track of this and skip T2 in order to solve this, which is the same sort of 
> work needed to implement Robert's suggestion.
>
> Reuven
>
> On Wed, Jun 26, 2019 at 12:28 PM Robert Bradshaw  wrote:
>>
>> Another option, that is nice from an API perspective but places a
>> burden on SDK implementers (and possibly runners), is to maintain the
>> ordering of timers by requiring timers to be fired in order, and if
>> any timers are set to fire them immediately before processing later
>> timers. In other words, if T1 sets T2 and modifies T3, these would
>> take effect (locally, the runner may not even know about T2) before T3
>> was processed.
>>
>> On Wed, Jun 26, 2019 at 11:13 AM Jan Lukavský  wrote:
>> >
>> > Hi,
>> >
>> > I have mentioned an issue I have come across [1] on several other
>> > threads, but it probably didn't attract the attention that it would desire.
>> >
>> > I will try to restate the problem here for clarity:
>> >
>> >   - on runners that use concept of bundles (the original issue mentions
>> > DirectRunner, but it will probably apply for other runners, which use
>> > bundles, as well), the workflow is as follows:
>> >
>> >a) process elements in bundle
>> >
>> >b) advance watermark
>> >
>> >c) process timers
>> >
>> >d) continue to next bundle
>> >
>> >   - the issue with this is that when we are initially at time T0, set
>> > two timers for T1 and T3, then advance watermark to T3 (or beyond), the
>> > timers will fire (correctly) in order T1, T3, but if timer at T1 sets
>> > another timer for T2, then this timer will be fired in next bundle (and
>> > therefore after T3)
>> >
>> >   - this causes issues mostly with race conditions in window GC timers
>> > and user timers (and users don't have any way to solve that!)
>> >
>> >   - note that the same applies when one timer tries to reset timer that
>> > is already in the current bundle
>> >
>> > I have investigated a way of solving this by running timers only for
>> > single timestamp (instant) at each bundle, but as Reuven pointed out,
>> > that could regress performance (mostly by delaying firing of timers,
>> > that could have fired). Options I see:
>> >
>> >   1) either set the OnTimerContext#timestamp() to current input
>> > watermark (not the time that user actually set the timer), or
>> >
>> >   2) add OnTimerContext#getCurrentInputWatermark() and disallow setting
>> > (or resetting) timers for time between OnProcessContext#timestamp and
>> > OnProcessContext#getCurrentInputWatermark(), by throwing an exception
>> >
>> >   3) any other option?
>> >
>> > Option 1) seems to be broken by design, as it can result in corrupt data
>> > (emitted with wrong timestamp, which is even somewhat arbitrary), I'm
>> > including it just for completeness. Option 2) is breaking change, that
>> > can result in PIpeline failures (although the failures will happen on
>> > Pipelines, that are probably already broken).
>> >
>> > Although I have come with a workaround in the work where I originally
>> > come across this issue, I think that this is generally serious and
>> > should be dealt with. Mostly because when using user-facing APIs, there
>> > are no workarounds possible, today.
>> >
>> > Thanks for discussion!
>> >
>> > Jan
>> >
>> > [1] https://issues.apache.org/jira/browse/BEAM-7520
>> >


Return types of Write transforms (aka best way to signal)

2019-06-26 Thread Ismaël Mejía
Beam introduced in version 2.4.0 the Wait transform to delay
processing of each window in a PCollection until signaled. This opened
new interesting patterns for example writing to a database and when
‘fully’ done write to another database.

To support this pattern an IO connector Write transform must return a
type different from PDone to signal the processing of the next step.
Some IOs have already started to implement this return type, but each
returned type has different pros and cons so I wanted to open the
discussion on this to see if we could somehow find a common pattern to
suggest IO authors to follow (Note: It may be the case that there is
not a pattern that fits certain use cases).

So far the approaches in our code base are:

1. Write returns ‘PCollection’

This is the simplest case but if subsequent transforms require more
data that could have been produced during the write it gets ‘lost’.
Used by JdbcIO and DynamoDBIO.

2. Write returns ‘PCollection’

We can return whatever we want but the return type is uncertain for
the user in case he wants to use information from it. This is less
user friendly but has the maintenance advantage of not changing
signatures if we want to change the return type in the future. Used by
RabbitMQIO.

3. Write returns a `PCollectionTuple`

It is like (2) but with the advantage of returning an untyped tuple of
PCollections so we can return more things. Used by SnsIO.

4. Write returns ‘a class that implements POutput’

This class wraps inside of the PCollections that were part of the
write, e.g. SpannerWriteResult. This is useful because we can be
interested on saving inside a PCollection of failed mutations apart of
the ‘done’ signal. Used by BigQueryIO and SpannerIO. A generics case
of this one is used by FileIO for Destinations via:
‘WriteFilesResult’.

5. Write returns ‘a class that implements POutput’ with specific data
(no PCollections)

This is similar to (4) but with the difference that the returned type
contains the specific data that may be needed next, for example not a
PCollection but values like the number of rows written. Used by
BigtableIO (PR in review at the moment). (This can be seen as a
simpler version of 4).

I would like to have your opinions on which approach you think it is
better or worse and arguments if you see other
advantages/disadvantages. I am probably more in the (4) camp but I
feel somehow attracted by the flexibility that the lack of strict
typing brings in (2, 3) in case of changes to the public IO API (of
course this can be contested too).

Any other ideas, preferences, issues we may be missing?


Re: [DISCUSS] Solving timer ordering on immutable bundles

2019-06-26 Thread Reuven Lax
I like this option the best. It might be trickier to implement, but seems
like it would be the most consistent solution.

Another problem it would solve is the following: let's say a bundle arrives
containing timers T1 and T2, and while processing T1 the user code deletes
T2 (or resets it to a time in the far future). I'm actually not sure what
we do today, but I'm a bit afraid that we will go ahead and fire T2 since
it's already in the bundle, which is clearly incorrect. The SDK needs to
keep track of this and skip T2 in order to solve this, which is the same
sort of work needed to implement Robert's suggestion.

Reuven

On Wed, Jun 26, 2019 at 12:28 PM Robert Bradshaw 
wrote:

> Another option, that is nice from an API perspective but places a
> burden on SDK implementers (and possibly runners), is to maintain the
> ordering of timers by requiring timers to be fired in order, and if
> any timers are set to fire them immediately before processing later
> timers. In other words, if T1 sets T2 and modifies T3, these would
> take effect (locally, the runner may not even know about T2) before T3
> was processed.
>
> On Wed, Jun 26, 2019 at 11:13 AM Jan Lukavský  wrote:
> >
> > Hi,
> >
> > I have mentioned an issue I have come across [1] on several other
> > threads, but it probably didn't attract the attention that it would
> desire.
> >
> > I will try to restate the problem here for clarity:
> >
> >   - on runners that use concept of bundles (the original issue mentions
> > DirectRunner, but it will probably apply for other runners, which use
> > bundles, as well), the workflow is as follows:
> >
> >a) process elements in bundle
> >
> >b) advance watermark
> >
> >c) process timers
> >
> >d) continue to next bundle
> >
> >   - the issue with this is that when we are initially at time T0, set
> > two timers for T1 and T3, then advance watermark to T3 (or beyond), the
> > timers will fire (correctly) in order T1, T3, but if timer at T1 sets
> > another timer for T2, then this timer will be fired in next bundle (and
> > therefore after T3)
> >
> >   - this causes issues mostly with race conditions in window GC timers
> > and user timers (and users don't have any way to solve that!)
> >
> >   - note that the same applies when one timer tries to reset timer that
> > is already in the current bundle
> >
> > I have investigated a way of solving this by running timers only for
> > single timestamp (instant) at each bundle, but as Reuven pointed out,
> > that could regress performance (mostly by delaying firing of timers,
> > that could have fired). Options I see:
> >
> >   1) either set the OnTimerContext#timestamp() to current input
> > watermark (not the time that user actually set the timer), or
> >
> >   2) add OnTimerContext#getCurrentInputWatermark() and disallow setting
> > (or resetting) timers for time between OnProcessContext#timestamp and
> > OnProcessContext#getCurrentInputWatermark(), by throwing an exception
> >
> >   3) any other option?
> >
> > Option 1) seems to be broken by design, as it can result in corrupt data
> > (emitted with wrong timestamp, which is even somewhat arbitrary), I'm
> > including it just for completeness. Option 2) is breaking change, that
> > can result in PIpeline failures (although the failures will happen on
> > Pipelines, that are probably already broken).
> >
> > Although I have come with a workaround in the work where I originally
> > come across this issue, I think that this is generally serious and
> > should be dealt with. Mostly because when using user-facing APIs, there
> > are no workarounds possible, today.
> >
> > Thanks for discussion!
> >
> > Jan
> >
> > [1] https://issues.apache.org/jira/browse/BEAM-7520
> >
>


Re: [DISCUSS] Solving timer ordering on immutable bundles

2019-06-26 Thread Jan Lukavský
Don't get me wrong, I think that this would be the best option, I just 
discarded it at the beginning. But maybe it can be done, but I'm not 
able to tell. At least what I saw in DirectRunner, I think that it works 
so that first timers are extracted from TimerInternals and then 
executed, regardless of the outcome. And that this seems to be designed 
to work exactly that way, so consequences of changing this might be huge 
(or not :)).


On 6/26/19 1:07 PM, Robert Bradshaw wrote:

Bundles would still be immutable pieces of work. E.g. in this case, T2
would never be sent to the runner.

On Wed, Jun 26, 2019 at 1:02 PM Jan Lukavský  wrote:

I think that this approach breaks the assumption that bundles are
executed as immutable pieces of work. This way, runners would have to
update the runner while executing it. It is another possible option, but
seems to have issues of its own.

On 6/26/19 12:28 PM, Robert Bradshaw wrote:

Another option, that is nice from an API perspective but places a
burden on SDK implementers (and possibly runners), is to maintain the
ordering of timers by requiring timers to be fired in order, and if
any timers are set to fire them immediately before processing later
timers. In other words, if T1 sets T2 and modifies T3, these would
take effect (locally, the runner may not even know about T2) before T3
was processed.

On Wed, Jun 26, 2019 at 11:13 AM Jan Lukavský  wrote:

Hi,

I have mentioned an issue I have come across [1] on several other
threads, but it probably didn't attract the attention that it would desire.

I will try to restate the problem here for clarity:

- on runners that use concept of bundles (the original issue mentions
DirectRunner, but it will probably apply for other runners, which use
bundles, as well), the workflow is as follows:

 a) process elements in bundle

 b) advance watermark

 c) process timers

 d) continue to next bundle

- the issue with this is that when we are initially at time T0, set
two timers for T1 and T3, then advance watermark to T3 (or beyond), the
timers will fire (correctly) in order T1, T3, but if timer at T1 sets
another timer for T2, then this timer will be fired in next bundle (and
therefore after T3)

- this causes issues mostly with race conditions in window GC timers
and user timers (and users don't have any way to solve that!)

- note that the same applies when one timer tries to reset timer that
is already in the current bundle

I have investigated a way of solving this by running timers only for
single timestamp (instant) at each bundle, but as Reuven pointed out,
that could regress performance (mostly by delaying firing of timers,
that could have fired). Options I see:

1) either set the OnTimerContext#timestamp() to current input
watermark (not the time that user actually set the timer), or

2) add OnTimerContext#getCurrentInputWatermark() and disallow setting
(or resetting) timers for time between OnProcessContext#timestamp and
OnProcessContext#getCurrentInputWatermark(), by throwing an exception

3) any other option?

Option 1) seems to be broken by design, as it can result in corrupt data
(emitted with wrong timestamp, which is even somewhat arbitrary), I'm
including it just for completeness. Option 2) is breaking change, that
can result in PIpeline failures (although the failures will happen on
Pipelines, that are probably already broken).

Although I have come with a workaround in the work where I originally
come across this issue, I think that this is generally serious and
should be dealt with. Mostly because when using user-facing APIs, there
are no workarounds possible, today.

Thanks for discussion!

Jan

[1] https://issues.apache.org/jira/browse/BEAM-7520



Re: [DISCUSS] Solving timer ordering on immutable bundles

2019-06-26 Thread Robert Bradshaw
Bundles would still be immutable pieces of work. E.g. in this case, T2
would never be sent to the runner.

On Wed, Jun 26, 2019 at 1:02 PM Jan Lukavský  wrote:
>
> I think that this approach breaks the assumption that bundles are
> executed as immutable pieces of work. This way, runners would have to
> update the runner while executing it. It is another possible option, but
> seems to have issues of its own.
>
> On 6/26/19 12:28 PM, Robert Bradshaw wrote:
> > Another option, that is nice from an API perspective but places a
> > burden on SDK implementers (and possibly runners), is to maintain the
> > ordering of timers by requiring timers to be fired in order, and if
> > any timers are set to fire them immediately before processing later
> > timers. In other words, if T1 sets T2 and modifies T3, these would
> > take effect (locally, the runner may not even know about T2) before T3
> > was processed.
> >
> > On Wed, Jun 26, 2019 at 11:13 AM Jan Lukavský  wrote:
> >> Hi,
> >>
> >> I have mentioned an issue I have come across [1] on several other
> >> threads, but it probably didn't attract the attention that it would desire.
> >>
> >> I will try to restate the problem here for clarity:
> >>
> >>- on runners that use concept of bundles (the original issue mentions
> >> DirectRunner, but it will probably apply for other runners, which use
> >> bundles, as well), the workflow is as follows:
> >>
> >> a) process elements in bundle
> >>
> >> b) advance watermark
> >>
> >> c) process timers
> >>
> >> d) continue to next bundle
> >>
> >>- the issue with this is that when we are initially at time T0, set
> >> two timers for T1 and T3, then advance watermark to T3 (or beyond), the
> >> timers will fire (correctly) in order T1, T3, but if timer at T1 sets
> >> another timer for T2, then this timer will be fired in next bundle (and
> >> therefore after T3)
> >>
> >>- this causes issues mostly with race conditions in window GC timers
> >> and user timers (and users don't have any way to solve that!)
> >>
> >>- note that the same applies when one timer tries to reset timer that
> >> is already in the current bundle
> >>
> >> I have investigated a way of solving this by running timers only for
> >> single timestamp (instant) at each bundle, but as Reuven pointed out,
> >> that could regress performance (mostly by delaying firing of timers,
> >> that could have fired). Options I see:
> >>
> >>1) either set the OnTimerContext#timestamp() to current input
> >> watermark (not the time that user actually set the timer), or
> >>
> >>2) add OnTimerContext#getCurrentInputWatermark() and disallow setting
> >> (or resetting) timers for time between OnProcessContext#timestamp and
> >> OnProcessContext#getCurrentInputWatermark(), by throwing an exception
> >>
> >>3) any other option?
> >>
> >> Option 1) seems to be broken by design, as it can result in corrupt data
> >> (emitted with wrong timestamp, which is even somewhat arbitrary), I'm
> >> including it just for completeness. Option 2) is breaking change, that
> >> can result in PIpeline failures (although the failures will happen on
> >> Pipelines, that are probably already broken).
> >>
> >> Although I have come with a workaround in the work where I originally
> >> come across this issue, I think that this is generally serious and
> >> should be dealt with. Mostly because when using user-facing APIs, there
> >> are no workarounds possible, today.
> >>
> >> Thanks for discussion!
> >>
> >> Jan
> >>
> >> [1] https://issues.apache.org/jira/browse/BEAM-7520
> >>


Re: [DISCUSS] Solving timer ordering on immutable bundles

2019-06-26 Thread Jan Lukavský
I think that this approach breaks the assumption that bundles are 
executed as immutable pieces of work. This way, runners would have to 
update the runner while executing it. It is another possible option, but 
seems to have issues of its own.


On 6/26/19 12:28 PM, Robert Bradshaw wrote:

Another option, that is nice from an API perspective but places a
burden on SDK implementers (and possibly runners), is to maintain the
ordering of timers by requiring timers to be fired in order, and if
any timers are set to fire them immediately before processing later
timers. In other words, if T1 sets T2 and modifies T3, these would
take effect (locally, the runner may not even know about T2) before T3
was processed.

On Wed, Jun 26, 2019 at 11:13 AM Jan Lukavský  wrote:

Hi,

I have mentioned an issue I have come across [1] on several other
threads, but it probably didn't attract the attention that it would desire.

I will try to restate the problem here for clarity:

   - on runners that use concept of bundles (the original issue mentions
DirectRunner, but it will probably apply for other runners, which use
bundles, as well), the workflow is as follows:

a) process elements in bundle

b) advance watermark

c) process timers

d) continue to next bundle

   - the issue with this is that when we are initially at time T0, set
two timers for T1 and T3, then advance watermark to T3 (or beyond), the
timers will fire (correctly) in order T1, T3, but if timer at T1 sets
another timer for T2, then this timer will be fired in next bundle (and
therefore after T3)

   - this causes issues mostly with race conditions in window GC timers
and user timers (and users don't have any way to solve that!)

   - note that the same applies when one timer tries to reset timer that
is already in the current bundle

I have investigated a way of solving this by running timers only for
single timestamp (instant) at each bundle, but as Reuven pointed out,
that could regress performance (mostly by delaying firing of timers,
that could have fired). Options I see:

   1) either set the OnTimerContext#timestamp() to current input
watermark (not the time that user actually set the timer), or

   2) add OnTimerContext#getCurrentInputWatermark() and disallow setting
(or resetting) timers for time between OnProcessContext#timestamp and
OnProcessContext#getCurrentInputWatermark(), by throwing an exception

   3) any other option?

Option 1) seems to be broken by design, as it can result in corrupt data
(emitted with wrong timestamp, which is even somewhat arbitrary), I'm
including it just for completeness. Option 2) is breaking change, that
can result in PIpeline failures (although the failures will happen on
Pipelines, that are probably already broken).

Although I have come with a workaround in the work where I originally
come across this issue, I think that this is generally serious and
should be dealt with. Mostly because when using user-facing APIs, there
are no workarounds possible, today.

Thanks for discussion!

Jan

[1] https://issues.apache.org/jira/browse/BEAM-7520



Re: [DISCUSS] Solving timer ordering on immutable bundles

2019-06-26 Thread Robert Bradshaw
Another option, that is nice from an API perspective but places a
burden on SDK implementers (and possibly runners), is to maintain the
ordering of timers by requiring timers to be fired in order, and if
any timers are set to fire them immediately before processing later
timers. In other words, if T1 sets T2 and modifies T3, these would
take effect (locally, the runner may not even know about T2) before T3
was processed.

On Wed, Jun 26, 2019 at 11:13 AM Jan Lukavský  wrote:
>
> Hi,
>
> I have mentioned an issue I have come across [1] on several other
> threads, but it probably didn't attract the attention that it would desire.
>
> I will try to restate the problem here for clarity:
>
>   - on runners that use concept of bundles (the original issue mentions
> DirectRunner, but it will probably apply for other runners, which use
> bundles, as well), the workflow is as follows:
>
>a) process elements in bundle
>
>b) advance watermark
>
>c) process timers
>
>d) continue to next bundle
>
>   - the issue with this is that when we are initially at time T0, set
> two timers for T1 and T3, then advance watermark to T3 (or beyond), the
> timers will fire (correctly) in order T1, T3, but if timer at T1 sets
> another timer for T2, then this timer will be fired in next bundle (and
> therefore after T3)
>
>   - this causes issues mostly with race conditions in window GC timers
> and user timers (and users don't have any way to solve that!)
>
>   - note that the same applies when one timer tries to reset timer that
> is already in the current bundle
>
> I have investigated a way of solving this by running timers only for
> single timestamp (instant) at each bundle, but as Reuven pointed out,
> that could regress performance (mostly by delaying firing of timers,
> that could have fired). Options I see:
>
>   1) either set the OnTimerContext#timestamp() to current input
> watermark (not the time that user actually set the timer), or
>
>   2) add OnTimerContext#getCurrentInputWatermark() and disallow setting
> (or resetting) timers for time between OnProcessContext#timestamp and
> OnProcessContext#getCurrentInputWatermark(), by throwing an exception
>
>   3) any other option?
>
> Option 1) seems to be broken by design, as it can result in corrupt data
> (emitted with wrong timestamp, which is even somewhat arbitrary), I'm
> including it just for completeness. Option 2) is breaking change, that
> can result in PIpeline failures (although the failures will happen on
> Pipelines, that are probably already broken).
>
> Although I have come with a workaround in the work where I originally
> come across this issue, I think that this is generally serious and
> should be dealt with. Mostly because when using user-facing APIs, there
> are no workarounds possible, today.
>
> Thanks for discussion!
>
> Jan
>
> [1] https://issues.apache.org/jira/browse/BEAM-7520
>


[DISCUSS] Solving timer ordering on immutable bundles

2019-06-26 Thread Jan Lukavský

Hi,

I have mentioned an issue I have come across [1] on several other 
threads, but it probably didn't attract the attention that it would desire.


I will try to restate the problem here for clarity:

 - on runners that use concept of bundles (the original issue mentions 
DirectRunner, but it will probably apply for other runners, which use 
bundles, as well), the workflow is as follows:


  a) process elements in bundle

  b) advance watermark

  c) process timers

  d) continue to next bundle

 - the issue with this is that when we are initially at time T0, set 
two timers for T1 and T3, then advance watermark to T3 (or beyond), the 
timers will fire (correctly) in order T1, T3, but if timer at T1 sets 
another timer for T2, then this timer will be fired in next bundle (and 
therefore after T3)


 - this causes issues mostly with race conditions in window GC timers 
and user timers (and users don't have any way to solve that!)


 - note that the same applies when one timer tries to reset timer that 
is already in the current bundle


I have investigated a way of solving this by running timers only for 
single timestamp (instant) at each bundle, but as Reuven pointed out, 
that could regress performance (mostly by delaying firing of timers, 
that could have fired). Options I see:


 1) either set the OnTimerContext#timestamp() to current input 
watermark (not the time that user actually set the timer), or


 2) add OnTimerContext#getCurrentInputWatermark() and disallow setting 
(or resetting) timers for time between OnProcessContext#timestamp and 
OnProcessContext#getCurrentInputWatermark(), by throwing an exception


 3) any other option?

Option 1) seems to be broken by design, as it can result in corrupt data 
(emitted with wrong timestamp, which is even somewhat arbitrary), I'm 
including it just for completeness. Option 2) is breaking change, that 
can result in PIpeline failures (although the failures will happen on 
Pipelines, that are probably already broken).


Although I have come with a workaround in the work where I originally 
come across this issue, I think that this is generally serious and 
should be dealt with. Mostly because when using user-facing APIs, there 
are no workarounds possible, today.


Thanks for discussion!

Jan

[1] https://issues.apache.org/jira/browse/BEAM-7520



Re: Blogpost Beam Summit 2019

2019-06-26 Thread Robert Bradshaw
Thanks. This is a great write-up. +1 to an official tweet.

On Wed, Jun 26, 2019, 6:11 AM Reza Rokni  wrote:

> Thank you for putting this together!
>
> On Wed, 26 Jun 2019 at 01:23, Ahmet Altay  wrote:
>
>> Thank you for writing and sharing this. I enjoyed reading it :) I think
>> it is worth sharing it as a tweet [1] as well.
>>
>> [1]  s.apache.org/beam-tweets
>>
>> On Tue, Jun 25, 2019 at 10:16 AM Valentyn Tymofieiev 
>> wrote:
>>
>>> Hi Juta,
>>>
>>> Thanks for sharing! You can also consider sending it to user mailing
>>> list.
>>>
>>> Note that Datastore IO now supports Python 3:
>>> https://lists.apache.org/thread.html/0a1fdb9b6b42b08a82eebf3b5b7898893ca236b5d3bb5c4751664034@%3Cuser.beam.apache.org%3E
>>> .
>>>
>>> Thanks,
>>> Valentyn
>>>
>>> On Tue, Jun 25, 2019 at 7:34 AM Juta Staes  wrote:
>>>

 Hi all,

 First of all a big thank you to the organizers and the speakers of the
 Beam Summit from last week. I had a great time and learned a lot!

 I wrote a blogpost about the main topics that were covered during the
 summit:
 https://blog.ml6.eu/learnings-from-beam-summit-europe-2019-8d115900f1ee
 Any thoughts and feedback are welcome.

 Kind regards,
 Juta
 --

 [image: https://ml6.eu] 

 * Juta Staes*
 ML6 Gent
 

  DISCLAIMER 
 This email and any files transmitted with it are confidential and
 intended solely for the use of the individual or entity to whom they are
 addressed. If you have received this email in error please notify the
 system manager. This message contains confidential information and is
 intended only for the individual named. If you are not the named addressee
 you should not disseminate, distribute or copy this e-mail. Please notify
 the sender immediately by e-mail if you have received this e-mail by
 mistake and delete this e-mail from your system. If you are not the
 intended recipient you are notified that disclosing, copying, distributing
 or taking any action in reliance on the contents of this information is
 strictly prohibited.

>>>
>
> --
>
> This email may be confidential and privileged. If you received this
> communication by mistake, please don't forward it to anyone else, please
> erase all copies and attachments, and please let me know that it has gone
> to the wrong person.
>
> The above terms reflect a potential business arrangement, are provided
> solely as a basis for further discussion, and are not intended to be and do
> not constitute a legally binding obligation. No legally binding obligations
> will be created, implied, or inferred until an agreement in final form is
> executed in writing by all parties involved.
>