Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-02-09 Thread Etienne Chauchot

Hi,

@JB: good to know for the roadmap! thanks

@Kenn: just to be clear: the timer fires fine. What I noticed is that it 
seems to be SET more than once because timer.setForNowPlus in called the 
@ProcessElement method. I am not 100% sure of it, what I noticed is that 
it started to work fine when I ensured to call timer.setForNowPlus only 
once. I don't say it's a bug, this is just not what I understood when I 
read the javadoc, I understood that it would be set  only once per 
window, see javadoc bellow:


An implementation of Timer is implicitly scoped - it may be scoped to a 
key and window, or a key, window, and trigger, etc.
A timer exists in one of two states: set or unset. A timer can be set 
only for a single time per scope.


I use the DirectRunner.

For the key part: ok, makes sense.

Thanks for your comments

I'm leaving on vacation tonight for a little more than two weeks, I'll 
resume work then, maybe start a PR when it's ready.


Etienne



Le 08/02/2017 à 19:48, Kenneth Knowles a écrit :

Hi Etienne,

If the timer is firing n times for n elements, that's a bug in the runner /
shared runner code. It should be deduped. Which runner? Can you file a JIRA
against me to investigate? I'm still in the process of fleshing out more
and more RunnableOnService (aka ValidatesRunner) tests so I will surely add
one (existing tests already OOMed without deduping, so it wasn't at the top
of my priority list)

If the end user doesn't have a natural key, I would just add one and remove
it within your transform. Not sure how easy this will be - you might need
user intervention. Of course, you still do need to shard or you'll be
processing the whole PCollection serially.

Kenn

On Wed, Feb 8, 2017 at 9:45 AM, Jean-Baptiste Onofré 
wrote:


Hi

AFAIR the timer per function is in the "roadmap" (remembering discussion
we had with Kenn).

I will take a deeper look next week on your branch.

Regards
JB

On Feb 8, 2017, 13:28, at 13:28, Etienne Chauchot 
wrote:

Hi Kenn,

I have started using state and timer APIs, they seem awesome!

Please take a look at
https://github.com/echauchot/beam/tree/BEAM-135-BATCHING-PARDO

It contains a PTransform that does the batching trans-bundles and
respecting the windows (even if tests are not finished yet, see @Ignore

and TODOs)

  I have some questions:

- I use the timer to detect the end of the window like you suggested.
But the timer can only be set in @ProcessElement and @Ontimer. Javadoc
says that timers are implicitly scoped to a key/window and that a timer

can be set only for a single time per scope. I noticed that if I call
timer.setForNowPlus in the @ProcessElement method, it seems that the
timer is set n times for n elements. So I just created a state with
boolean to prevent setting the timer more than once per key/window.
=> Would it be good maybe to have a end user way of indicating that the

timer will be set only once per key/window. Something analogous to
@Setup, to avoid the user having to use a state boolean?

- I understand that state and timers need to be per-key, but if the end

user does not need a key (lets say he just needs a
PCollection).
Then, do we tell him to use a PCollection anyway like I wrote in
the
javadoc of BatchingParDo?

WDYT?

Thanks,

Etienne


Le 26/01/2017 à 17:28, Etienne Chauchot a écrit :

Wonderful !

Thanks Kenn !

Etienne


Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :

Hi Etienne,

I was drafting a proposal about @OnWindowExpiration when this email
arrived. I thought I would try to quickly unblock you by responding
with a
TL;DR: you can achieve your goals with state & timers as they

currently

exist. You'll set a timer for
window.maxTimestamp().plus(allowedLateness)
precisely - when this timer fires, you are guaranteed that the input
watermark has exceeded this point (so all new data is droppable)
while the
output timestamp is held to this point (so you can safely output

into

the
window).

@OnWindowExpiration is (1) a convenience to save you from needing a
handle
on the allowed lateness (not a problem in your case) and (2)

actually

meaningful and potentially less expensive to implement in the

absence of

state (this is why it needs a design discussion at all, really).

Caveat: these APIs are new and not supported in every runner and
windowing
configuration.

Kenn

On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot



wrote:


Hi,

I have started to implement this ticket. For now it is implemented

as a

PTransform that simply does ParDo.of(new DoFn) and all the

processing

related to batching is done in the DoFn.

I'm starting to deal with windows and bundles (starting to take a
look at
the State API to process trans-bundles, more questions about this

to

come).
My comments/questions are inline:


Le 17/01/2017 à 18:41, Ben Chambers a écrit :


We should start by understanding the goals. If elements are in
different
windows can they be out in the same batch? If 

Re: Issue with Coder documentation regarding context

2017-02-09 Thread Kenneth Knowles
+1 totally agree that this is hardly documented and worth making clear.
File a JIRA?

Your write-up is nice, though I have a little to add: The context doesn't
even really indicate whether it is nested or not; we chose the wrong name.
It actually indicates whether this is the _last_ object in the stream, so
the method `isWholeStream()` is much better named. For example, the value
in KvCoder [1].

We should also emphasize the use of CoderProperties as the best way to test
a coder. It will test it in multiple contexts, including when there are
extra bytes, if I recall correctly.

File a JIRA?

Kenn

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java#L89

On Thu, Feb 9, 2017 at 10:12 AM, Aviem Zur  wrote:

> Hi,
>
> I think improvements can be made to the documentation of `encode` and
> `decode` methods in `Coder`.
>
> A coder may be used to encode/decode several objects using a single stream,
> you cannot assume that the stream the coder encodes to/decodes from only
> contains bytes representing a single object. For example, when the coder is
> used in an `IterableCoder`, for example in `GroupByKey`.
>
> When implementing a coder this needs to be taken into account.
>
> The `context` argument in `encode` and `decode` methods provides the
> necessary information.
>
> The existing documentation for these methods does not seem to cover this.
> If users are not aware of this when implementing these methods it can cause
> errors or skewed results.
>
> See:
> https://github.com/apache/beam/blob/master/sdks/java/
> core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L126
> and:
> https://github.com/apache/beam/blob/master/sdks/java/
> core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L137
>
> This is partially addressed in the documentation of the static `Context`
> values:
> `OUTER`:
> https://github.com/apache/beam/blob/master/sdks/java/
> core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L72
> and `NESTED`:
> https://github.com/apache/beam/blob/master/sdks/java/
> core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L79
>
> However, I think that the documentation of `encode` and `decode` should
> explain this concept clearly, to avoid confusing users implementing coders.
>


Issue with Coder documentation regarding context

2017-02-09 Thread Aviem Zur
Hi,

I think improvements can be made to the documentation of `encode` and
`decode` methods in `Coder`.

A coder may be used to encode/decode several objects using a single stream,
you cannot assume that the stream the coder encodes to/decodes from only
contains bytes representing a single object. For example, when the coder is
used in an `IterableCoder`, for example in `GroupByKey`.

When implementing a coder this needs to be taken into account.

The `context` argument in `encode` and `decode` methods provides the
necessary information.

The existing documentation for these methods does not seem to cover this.
If users are not aware of this when implementing these methods it can cause
errors or skewed results.

See:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L126
and:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L137

This is partially addressed in the documentation of the static `Context`
values:
`OUTER`:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L72
and `NESTED`:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L79

However, I think that the documentation of `encode` and `decode` should
explain this concept clearly, to avoid confusing users implementing coders.