Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-07-11 Thread Etienne Chauchot

Yes there is now a new PTransform that is called GroupIntoBatches

Best,

Etienne


Le 11/07/2017 à 02:38, Robert Bradshaw a écrit :

Sorry, just saw https://github.com/apache/beam/pull/2211

On Mon, Jul 10, 2017 at 5:37 PM, Robert Bradshaw  wrote:

Any progress on this?

On Thu, Mar 9, 2017 at 1:43 AM, Etienne Chauchot  wrote:

Hi all,

We had a discussion with Kenn yesterday about point 1 bellow, I would like
to note it here on the ML:

Using new method timer.set() instead of timer.setForNowPlus() makes the
timer fire at the right time.

Another thing, regarding point 2: if I inject the window in the @Ontimer
method and print it, I see that at the moment the timer fires (at last
timestamp of the window), the window is the GlobalWindow. I guess that is
because the fixed window has just ended. Maybe the empty bagState that I get
here is due to the end of window (passing to the GlobalWindow). I mean, as
the states are scoped per window, and the window is different, then another
bagState instance gets injected. Hence the empty bagState

WDYT?

I will open a PR even if this work is not finished yet, that way, we will
have a convenient environment for discussing this code.

Etienne


Le 03/03/2017 à 11:48, Etienne Chauchot a écrit :

Hi all,

@Kenn: I have enhanced my streaming test in
https://github.com/echauchot/beam/tree/BEAM-135-BATCHING-PARDO in particular
to check that BatchingParDo doesn't mess up windows. It seems that it
actually does :)

The input collection contains 10 elements timestamped at 1s interval and
it is divided into fixed windows of 5s duration (so 2 windows). startTime is
epoch. The timer is used to detect the end of the window and output the
content of the batch (buffer) then.

I added some logs and I noticed two strange things (that might be linked):


1-The timer is set twice, and it is set correctly

INFOS: * SET TIMER * Delay of 4999 ms added to timestamp
1970-01-01T00:00:00.000Z set for window
[1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z)

INFOS: * SET TIMER * Delay of 4999 ms added to timestamp
1970-01-01T00:00:05.000Z set for window
[1970-01-01T00:00:05.000Z..1970-01-01T00:00:10.000Z)

It correctly fires twice but not at the right timeStamp:

INFOS: * END OF WINDOW * for timer timestamp
1970-01-01T00:00:04.999Z

=>Correct

INFOS: * END OF WINDOW * for timer timestamp
1970-01-01T00:00:04.999Z

=> Incorrect (should fire at timestamp 1970-01-01T00:00:09.999Z)

Do I need to call timer.cancel() after the timer has fired ? But
timer.cancel() is not supported by DirectRunner yet.



2- in @OnTimer method the injected batch bagState parameter is empty
whereas it was added some elements since last batch.clear() while processing
the same window

INFOS: * BATCH * clear

INFOS: * BATCH * Add element for window
[1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z)

INFOS: * BATCH * Add element for window
[1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z)
..
INFOS: * END OF WINDOW * for timer timestamp
1970-01-01T00:00:04.999Z
INFOS: * IN ONTIMER * batch size 0

Am I doing something wrong with timers or is there something not totally
finished with them (as you noticed they are quite new)?

WDYT?


Thanks

Etienne


Le 09/02/2017 à 09:55, Etienne Chauchot a écrit :

Hi,

@JB: good to know for the roadmap! thanks

@Kenn: just to be clear: the timer fires fine. What I noticed is that it
seems to be SET more than once because timer.setForNowPlus in called the
@ProcessElement method. I am not 100% sure of it, what I noticed is that it
started to work fine when I ensured to call timer.setForNowPlus only once. I
don't say it's a bug, this is just not what I understood when I read the
javadoc, I understood that it would be set  only once per window, see
javadoc bellow:

An implementation of Timer is implicitly scoped - it may be scoped to a
key and window, or a key, window, and trigger, etc.
A timer exists in one of two states: set or unset. A timer can be set
only for a single time per scope.

I use the DirectRunner.

For the key part: ok, makes sense.

Thanks for your comments

I'm leaving on vacation tonight for a little more than two weeks, I'll
resume work then, maybe start a PR when it's ready.

Etienne



Le 08/02/2017 à 19:48, Kenneth Knowles a écrit :

Hi Etienne,

If the timer is firing n times for n elements, that's a bug in the
runner /
shared runner code. It should be deduped. Which runner? Can you file a
JIRA
against me to investigate? I'm still in the process of fleshing out more
and more RunnableOnService (aka ValidatesRunner) tests so I will surely
add
one (existing tests already OOMed without deduping, so it wasn't at the
top
of my priority list)

If the end user doesn't have a natural key, I would just add one and
remove
it within your transform. Not sure how easy this will be - you might
need
user intervention. Of course, you still do need to shard or you'll 

Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-07-10 Thread Robert Bradshaw
Sorry, just saw https://github.com/apache/beam/pull/2211

On Mon, Jul 10, 2017 at 5:37 PM, Robert Bradshaw  wrote:
> Any progress on this?
>
> On Thu, Mar 9, 2017 at 1:43 AM, Etienne Chauchot  wrote:
>> Hi all,
>>
>> We had a discussion with Kenn yesterday about point 1 bellow, I would like
>> to note it here on the ML:
>>
>> Using new method timer.set() instead of timer.setForNowPlus() makes the
>> timer fire at the right time.
>>
>> Another thing, regarding point 2: if I inject the window in the @Ontimer
>> method and print it, I see that at the moment the timer fires (at last
>> timestamp of the window), the window is the GlobalWindow. I guess that is
>> because the fixed window has just ended. Maybe the empty bagState that I get
>> here is due to the end of window (passing to the GlobalWindow). I mean, as
>> the states are scoped per window, and the window is different, then another
>> bagState instance gets injected. Hence the empty bagState
>>
>> WDYT?
>>
>> I will open a PR even if this work is not finished yet, that way, we will
>> have a convenient environment for discussing this code.
>>
>> Etienne
>>
>>
>> Le 03/03/2017 à 11:48, Etienne Chauchot a écrit :
>>>
>>> Hi all,
>>>
>>> @Kenn: I have enhanced my streaming test in
>>> https://github.com/echauchot/beam/tree/BEAM-135-BATCHING-PARDO in particular
>>> to check that BatchingParDo doesn't mess up windows. It seems that it
>>> actually does :)
>>>
>>> The input collection contains 10 elements timestamped at 1s interval and
>>> it is divided into fixed windows of 5s duration (so 2 windows). startTime is
>>> epoch. The timer is used to detect the end of the window and output the
>>> content of the batch (buffer) then.
>>>
>>> I added some logs and I noticed two strange things (that might be linked):
>>>
>>>
>>> 1-The timer is set twice, and it is set correctly
>>>
>>> INFOS: * SET TIMER * Delay of 4999 ms added to timestamp
>>> 1970-01-01T00:00:00.000Z set for window
>>> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z)
>>>
>>> INFOS: * SET TIMER * Delay of 4999 ms added to timestamp
>>> 1970-01-01T00:00:05.000Z set for window
>>> [1970-01-01T00:00:05.000Z..1970-01-01T00:00:10.000Z)
>>>
>>> It correctly fires twice but not at the right timeStamp:
>>>
>>> INFOS: * END OF WINDOW * for timer timestamp
>>> 1970-01-01T00:00:04.999Z
>>>
>>> =>Correct
>>>
>>> INFOS: * END OF WINDOW * for timer timestamp
>>> 1970-01-01T00:00:04.999Z
>>>
>>> => Incorrect (should fire at timestamp 1970-01-01T00:00:09.999Z)
>>>
>>> Do I need to call timer.cancel() after the timer has fired ? But
>>> timer.cancel() is not supported by DirectRunner yet.
>>>
>>>
>>>
>>> 2- in @OnTimer method the injected batch bagState parameter is empty
>>> whereas it was added some elements since last batch.clear() while processing
>>> the same window
>>>
>>> INFOS: * BATCH * clear
>>>
>>> INFOS: * BATCH * Add element for window
>>> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z)
>>>
>>> INFOS: * BATCH * Add element for window
>>> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z)
>>> ..
>>> INFOS: * END OF WINDOW * for timer timestamp
>>> 1970-01-01T00:00:04.999Z
>>> INFOS: * IN ONTIMER * batch size 0
>>>
>>> Am I doing something wrong with timers or is there something not totally
>>> finished with them (as you noticed they are quite new)?
>>>
>>> WDYT?
>>>
>>>
>>> Thanks
>>>
>>> Etienne
>>>
>>>
>>> Le 09/02/2017 à 09:55, Etienne Chauchot a écrit :

 Hi,

 @JB: good to know for the roadmap! thanks

 @Kenn: just to be clear: the timer fires fine. What I noticed is that it
 seems to be SET more than once because timer.setForNowPlus in called the
 @ProcessElement method. I am not 100% sure of it, what I noticed is that it
 started to work fine when I ensured to call timer.setForNowPlus only once. 
 I
 don't say it's a bug, this is just not what I understood when I read the
 javadoc, I understood that it would be set  only once per window, see
 javadoc bellow:

 An implementation of Timer is implicitly scoped - it may be scoped to a
 key and window, or a key, window, and trigger, etc.
 A timer exists in one of two states: set or unset. A timer can be set
 only for a single time per scope.

 I use the DirectRunner.

 For the key part: ok, makes sense.

 Thanks for your comments

 I'm leaving on vacation tonight for a little more than two weeks, I'll
 resume work then, maybe start a PR when it's ready.

 Etienne



 Le 08/02/2017 à 19:48, Kenneth Knowles a écrit :
>
> Hi Etienne,
>
> If the timer is firing n times for n elements, that's a bug in the
> runner /
> shared runner code. It should be deduped. Which runner? Can you file a
> JIRA
> against me to investigate? I'm still in the process of fleshing out more

Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-07-10 Thread Robert Bradshaw
Any progress on this?

On Thu, Mar 9, 2017 at 1:43 AM, Etienne Chauchot  wrote:
> Hi all,
>
> We had a discussion with Kenn yesterday about point 1 bellow, I would like
> to note it here on the ML:
>
> Using new method timer.set() instead of timer.setForNowPlus() makes the
> timer fire at the right time.
>
> Another thing, regarding point 2: if I inject the window in the @Ontimer
> method and print it, I see that at the moment the timer fires (at last
> timestamp of the window), the window is the GlobalWindow. I guess that is
> because the fixed window has just ended. Maybe the empty bagState that I get
> here is due to the end of window (passing to the GlobalWindow). I mean, as
> the states are scoped per window, and the window is different, then another
> bagState instance gets injected. Hence the empty bagState
>
> WDYT?
>
> I will open a PR even if this work is not finished yet, that way, we will
> have a convenient environment for discussing this code.
>
> Etienne
>
>
> Le 03/03/2017 à 11:48, Etienne Chauchot a écrit :
>>
>> Hi all,
>>
>> @Kenn: I have enhanced my streaming test in
>> https://github.com/echauchot/beam/tree/BEAM-135-BATCHING-PARDO in particular
>> to check that BatchingParDo doesn't mess up windows. It seems that it
>> actually does :)
>>
>> The input collection contains 10 elements timestamped at 1s interval and
>> it is divided into fixed windows of 5s duration (so 2 windows). startTime is
>> epoch. The timer is used to detect the end of the window and output the
>> content of the batch (buffer) then.
>>
>> I added some logs and I noticed two strange things (that might be linked):
>>
>>
>> 1-The timer is set twice, and it is set correctly
>>
>> INFOS: * SET TIMER * Delay of 4999 ms added to timestamp
>> 1970-01-01T00:00:00.000Z set for window
>> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z)
>>
>> INFOS: * SET TIMER * Delay of 4999 ms added to timestamp
>> 1970-01-01T00:00:05.000Z set for window
>> [1970-01-01T00:00:05.000Z..1970-01-01T00:00:10.000Z)
>>
>> It correctly fires twice but not at the right timeStamp:
>>
>> INFOS: * END OF WINDOW * for timer timestamp
>> 1970-01-01T00:00:04.999Z
>>
>> =>Correct
>>
>> INFOS: * END OF WINDOW * for timer timestamp
>> 1970-01-01T00:00:04.999Z
>>
>> => Incorrect (should fire at timestamp 1970-01-01T00:00:09.999Z)
>>
>> Do I need to call timer.cancel() after the timer has fired ? But
>> timer.cancel() is not supported by DirectRunner yet.
>>
>>
>>
>> 2- in @OnTimer method the injected batch bagState parameter is empty
>> whereas it was added some elements since last batch.clear() while processing
>> the same window
>>
>> INFOS: * BATCH * clear
>>
>> INFOS: * BATCH * Add element for window
>> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z)
>>
>> INFOS: * BATCH * Add element for window
>> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z)
>> ..
>> INFOS: * END OF WINDOW * for timer timestamp
>> 1970-01-01T00:00:04.999Z
>> INFOS: * IN ONTIMER * batch size 0
>>
>> Am I doing something wrong with timers or is there something not totally
>> finished with them (as you noticed they are quite new)?
>>
>> WDYT?
>>
>>
>> Thanks
>>
>> Etienne
>>
>>
>> Le 09/02/2017 à 09:55, Etienne Chauchot a écrit :
>>>
>>> Hi,
>>>
>>> @JB: good to know for the roadmap! thanks
>>>
>>> @Kenn: just to be clear: the timer fires fine. What I noticed is that it
>>> seems to be SET more than once because timer.setForNowPlus in called the
>>> @ProcessElement method. I am not 100% sure of it, what I noticed is that it
>>> started to work fine when I ensured to call timer.setForNowPlus only once. I
>>> don't say it's a bug, this is just not what I understood when I read the
>>> javadoc, I understood that it would be set  only once per window, see
>>> javadoc bellow:
>>>
>>> An implementation of Timer is implicitly scoped - it may be scoped to a
>>> key and window, or a key, window, and trigger, etc.
>>> A timer exists in one of two states: set or unset. A timer can be set
>>> only for a single time per scope.
>>>
>>> I use the DirectRunner.
>>>
>>> For the key part: ok, makes sense.
>>>
>>> Thanks for your comments
>>>
>>> I'm leaving on vacation tonight for a little more than two weeks, I'll
>>> resume work then, maybe start a PR when it's ready.
>>>
>>> Etienne
>>>
>>>
>>>
>>> Le 08/02/2017 à 19:48, Kenneth Knowles a écrit :

 Hi Etienne,

 If the timer is firing n times for n elements, that's a bug in the
 runner /
 shared runner code. It should be deduped. Which runner? Can you file a
 JIRA
 against me to investigate? I'm still in the process of fleshing out more
 and more RunnableOnService (aka ValidatesRunner) tests so I will surely
 add
 one (existing tests already OOMed without deduping, so it wasn't at the
 top
 of my priority list)

 If the end user doesn't have a natural key, I would just add one and
 

Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-03-09 Thread Etienne Chauchot

Hi all,

We had a discussion with Kenn yesterday about point 1 bellow, I would 
like to note it here on the ML:


Using new method timer.set() instead of timer.setForNowPlus() makes the 
timer fire at the right time.


Another thing, regarding point 2: if I inject the window in the @Ontimer 
method and print it, I see that at the moment the timer fires (at last 
timestamp of the window), the window is the GlobalWindow. I guess that 
is because the fixed window has just ended. Maybe the empty bagState 
that I get here is due to the end of window (passing to the 
GlobalWindow). I mean, as the states are scoped per window, and the 
window is different, then another bagState instance gets injected. Hence 
the empty bagState


WDYT?

I will open a PR even if this work is not finished yet, that way, we 
will have a convenient environment for discussing this code.


Etienne

Le 03/03/2017 à 11:48, Etienne Chauchot a écrit :

Hi all,

@Kenn: I have enhanced my streaming test in 
https://github.com/echauchot/beam/tree/BEAM-135-BATCHING-PARDO in 
particular to check that BatchingParDo doesn't mess up windows. It 
seems that it actually does :)


The input collection contains 10 elements timestamped at 1s interval 
and it is divided into fixed windows of 5s duration (so 2 windows). 
startTime is epoch. The timer is used to detect the end of the window 
and output the content of the batch (buffer) then.


I added some logs and I noticed two strange things (that might be 
linked):



1-The timer is set twice, and it is set correctly

INFOS: * SET TIMER * Delay of 4999 ms added to timestamp 
1970-01-01T00:00:00.000Z set for window 
[1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z)


INFOS: * SET TIMER * Delay of 4999 ms added to timestamp 
1970-01-01T00:00:05.000Z set for window 
[1970-01-01T00:00:05.000Z..1970-01-01T00:00:10.000Z)


It correctly fires twice but not at the right timeStamp:

INFOS: * END OF WINDOW * for timer timestamp 
1970-01-01T00:00:04.999Z


=>Correct

INFOS: * END OF WINDOW * for timer timestamp 
1970-01-01T00:00:04.999Z


=> Incorrect (should fire at timestamp 1970-01-01T00:00:09.999Z)

Do I need to call timer.cancel() after the timer has fired ? But 
timer.cancel() is not supported by DirectRunner yet.




2- in @OnTimer method the injected batch bagState parameter is empty 
whereas it was added some elements since last batch.clear() while 
processing the same window


INFOS: * BATCH * clear

INFOS: * BATCH * Add element for window 
[1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z)


INFOS: * BATCH * Add element for window 
[1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z)

..
INFOS: * END OF WINDOW * for timer timestamp 
1970-01-01T00:00:04.999Z

INFOS: * IN ONTIMER * batch size 0

Am I doing something wrong with timers or is there something not 
totally finished with them (as you noticed they are quite new)?


WDYT?


Thanks

Etienne


Le 09/02/2017 à 09:55, Etienne Chauchot a écrit :

Hi,

@JB: good to know for the roadmap! thanks

@Kenn: just to be clear: the timer fires fine. What I noticed is that 
it seems to be SET more than once because timer.setForNowPlus in 
called the @ProcessElement method. I am not 100% sure of it, what I 
noticed is that it started to work fine when I ensured to call 
timer.setForNowPlus only once. I don't say it's a bug, this is just 
not what I understood when I read the javadoc, I understood that it 
would be set  only once per window, see javadoc bellow:


An implementation of Timer is implicitly scoped - it may be scoped to 
a key and window, or a key, window, and trigger, etc.
A timer exists in one of two states: set or unset. A timer can be set 
only for a single time per scope.


I use the DirectRunner.

For the key part: ok, makes sense.

Thanks for your comments

I'm leaving on vacation tonight for a little more than two weeks, 
I'll resume work then, maybe start a PR when it's ready.


Etienne



Le 08/02/2017 à 19:48, Kenneth Knowles a écrit :

Hi Etienne,

If the timer is firing n times for n elements, that's a bug in the 
runner /
shared runner code. It should be deduped. Which runner? Can you file 
a JIRA
against me to investigate? I'm still in the process of fleshing out 
more
and more RunnableOnService (aka ValidatesRunner) tests so I will 
surely add
one (existing tests already OOMed without deduping, so it wasn't at 
the top

of my priority list)

If the end user doesn't have a natural key, I would just add one and 
remove
it within your transform. Not sure how easy this will be - you might 
need

user intervention. Of course, you still do need to shard or you'll be
processing the whole PCollection serially.

Kenn

On Wed, Feb 8, 2017 at 9:45 AM, Jean-Baptiste Onofré 
wrote:


Hi

AFAIR the timer per function is in the "roadmap" (remembering 
discussion

we had with Kenn).

I will take a deeper look next week on your branch.

Regards
JB

On Feb 

Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-02-09 Thread Etienne Chauchot

Hi,

@JB: good to know for the roadmap! thanks

@Kenn: just to be clear: the timer fires fine. What I noticed is that it 
seems to be SET more than once because timer.setForNowPlus in called the 
@ProcessElement method. I am not 100% sure of it, what I noticed is that 
it started to work fine when I ensured to call timer.setForNowPlus only 
once. I don't say it's a bug, this is just not what I understood when I 
read the javadoc, I understood that it would be set  only once per 
window, see javadoc bellow:


An implementation of Timer is implicitly scoped - it may be scoped to a 
key and window, or a key, window, and trigger, etc.
A timer exists in one of two states: set or unset. A timer can be set 
only for a single time per scope.


I use the DirectRunner.

For the key part: ok, makes sense.

Thanks for your comments

I'm leaving on vacation tonight for a little more than two weeks, I'll 
resume work then, maybe start a PR when it's ready.


Etienne



Le 08/02/2017 à 19:48, Kenneth Knowles a écrit :

Hi Etienne,

If the timer is firing n times for n elements, that's a bug in the runner /
shared runner code. It should be deduped. Which runner? Can you file a JIRA
against me to investigate? I'm still in the process of fleshing out more
and more RunnableOnService (aka ValidatesRunner) tests so I will surely add
one (existing tests already OOMed without deduping, so it wasn't at the top
of my priority list)

If the end user doesn't have a natural key, I would just add one and remove
it within your transform. Not sure how easy this will be - you might need
user intervention. Of course, you still do need to shard or you'll be
processing the whole PCollection serially.

Kenn

On Wed, Feb 8, 2017 at 9:45 AM, Jean-Baptiste Onofré 
wrote:


Hi

AFAIR the timer per function is in the "roadmap" (remembering discussion
we had with Kenn).

I will take a deeper look next week on your branch.

Regards
JB

On Feb 8, 2017, 13:28, at 13:28, Etienne Chauchot 
wrote:

Hi Kenn,

I have started using state and timer APIs, they seem awesome!

Please take a look at
https://github.com/echauchot/beam/tree/BEAM-135-BATCHING-PARDO

It contains a PTransform that does the batching trans-bundles and
respecting the windows (even if tests are not finished yet, see @Ignore

and TODOs)

  I have some questions:

- I use the timer to detect the end of the window like you suggested.
But the timer can only be set in @ProcessElement and @Ontimer. Javadoc
says that timers are implicitly scoped to a key/window and that a timer

can be set only for a single time per scope. I noticed that if I call
timer.setForNowPlus in the @ProcessElement method, it seems that the
timer is set n times for n elements. So I just created a state with
boolean to prevent setting the timer more than once per key/window.
=> Would it be good maybe to have a end user way of indicating that the

timer will be set only once per key/window. Something analogous to
@Setup, to avoid the user having to use a state boolean?

- I understand that state and timers need to be per-key, but if the end

user does not need a key (lets say he just needs a
PCollection).
Then, do we tell him to use a PCollection anyway like I wrote in
the
javadoc of BatchingParDo?

WDYT?

Thanks,

Etienne


Le 26/01/2017 à 17:28, Etienne Chauchot a écrit :

Wonderful !

Thanks Kenn !

Etienne


Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :

Hi Etienne,

I was drafting a proposal about @OnWindowExpiration when this email
arrived. I thought I would try to quickly unblock you by responding
with a
TL;DR: you can achieve your goals with state & timers as they

currently

exist. You'll set a timer for
window.maxTimestamp().plus(allowedLateness)
precisely - when this timer fires, you are guaranteed that the input
watermark has exceeded this point (so all new data is droppable)
while the
output timestamp is held to this point (so you can safely output

into

the
window).

@OnWindowExpiration is (1) a convenience to save you from needing a
handle
on the allowed lateness (not a problem in your case) and (2)

actually

meaningful and potentially less expensive to implement in the

absence of

state (this is why it needs a design discussion at all, really).

Caveat: these APIs are new and not supported in every runner and
windowing
configuration.

Kenn

On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot



wrote:


Hi,

I have started to implement this ticket. For now it is implemented

as a

PTransform that simply does ParDo.of(new DoFn) and all the

processing

related to batching is done in the DoFn.

I'm starting to deal with windows and bundles (starting to take a
look at
the State API to process trans-bundles, more questions about this

to

come).
My comments/questions are inline:


Le 17/01/2017 à 18:41, Ben Chambers a écrit :


We should start by understanding the goals. If elements are in
different
windows can they be out in the same batch? If 

Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-30 Thread Etienne Chauchot

Hi,

Le 27/01/2017 à 19:44, Robert Bradshaw a écrit :

On Fri, Jan 27, 2017 at 6:55 AM, Etienne Chauchot  wrote:

Hi Robert,

Le 26/01/2017 à 18:17, Robert Bradshaw a écrit :

First off, let me say that a *correctly* batching DoFn is a lot of
value, especially because it's (too) easy to (often unknowingly)
implement it incorrectly.

I definitely agree, I put a similar comment in another email. As an example
I recall a comment of someone in stackoverflow who said that he would have
forgotten to flush the batch in finishBundle.

My take is that a BatchingParDo should be a PTransform that takes a DoFn, ? extends
Iterable> as a parameter, as well as some (optional?) batching
criteria (probably batch size and/or batch timeout).

This is how I implemented it plus another perElement function that produces
an intermediary type to allow the user to use another type than InputType in
perBatchFn (for ex convert elements to DTO  and to call external service in
perBatchFn using DTOs) or to do any other per-element computation before
adding elements to the batch.

I think we should omit the perElement as part of this transform as
that can be done immediately prior to this one without any loss of
generality or utility. One can always wrap this composition in a new
PTransform if desired.
You're right, it is simpler to let the user do it as a pipeline step, 
I'll remove the perElementFn.

Besides I used SimpleFunctions

SimpleFunction perElementFn;
SimpleFunction perBatchFn;

The input ArrayList in perBatchFn is the buffer of elements.

We should be as general as possible, e.g. SimpleFunction, ? extends Iterable>.

Yes sure, I've updated it.

Again, letting
this be a DoFn rather than SimpleFunction allows for things such as
setup, teardown, side inputs, etc. but forces complicated delegation
so this is probably a fine start.


Yes, actually, I hesitated, I have opted for the simpler as a start :)
I guess, as the list of possible use cases grow, we might change to DoFn 
to leverage its possibilities.

The DoFn should
map the set of inputs to a set of outputs of the same size and in the
same order as the input (or, possibly, an empty list would be
acceptable). Semantically, it should be defined as

public expand(PCollection input) {
return input
  .apply(e -> SingletonList.of(e))
  .apply(parDo(batchDoFn))
  .apply(es -> Iterables.onlyElement(es));
}

Getting this correct wrt timestamps and windowing is tricky. However,
even something that handles the most trivial case (e.g. GlobalWindows
only) and degenerates to batch sizes of 1 for other cases would allow
people to start using this code (rather than rolling their own) and we
could then continue to refine it.

Yes sure, right now the code handles only the global window case. This is
the very beginning, I'm still in the simple naive approach (no window and no
buffering trans-bundle support),

+1. We should assert on construction that the windowing is global.
Even in the global window case, we'll want to avoid mangling element
timestamps.


I plan to use state API to buffer
trans-bundle and timer API (as Kenn pointed) to detect the end of the window
in the DoFn.

Makes sense. It'd be nice if we could figure out a way to do this
across keys (and windows, when the batch computation isn't sensitive
to this of course).


Thanks for your comments Robert.

Glad to help. Thanks for taking this on.

- Robert

Thanks for your comments

Etienne



Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-27 Thread Jean-Baptiste Onofré
It makes sense with the PTransform, less invasive, but the user has to 
define to define two functions (one perElement, the other perBatch). I 
like the DoFn approach with annotations, and I would do the same for the 
batch.


The "trigger/window" is a key part as well, as even if the batch is not 
complete, the perBatch Fn might be called.


Regards
JB

On 01/27/2017 03:55 PM, Etienne Chauchot wrote:

Hi Robert,

Le 26/01/2017 à 18:17, Robert Bradshaw a écrit :

First off, let me say that a *correctly* batching DoFn is a lot of
value, especially because it's (too) easy to (often unknowingly)
implement it incorrectly.

I definitely agree, I put a similar comment in another email. As an
example I recall a comment of someone in stackoverflow who said that he
would have forgotten to flush the batch in finishBundle.

My take is that a BatchingParDo should be a PTransform that takes a DoFn, ? extends
Iterable> as a parameter, as well as some (optional?) batching
criteria (probably batch size and/or batch timeout).

This is how I implemented it plus another perElement function that
produces an intermediary type to allow the user to use another type than
InputType in perBatchFn (for ex convert elements to DTO  and to call
external service in perBatchFn using DTOs) or to do any other
per-element computation before adding elements to the batch.

Besides I used SimpleFunctions

SimpleFunction perElementFn;
SimpleFunction perBatchFn;

The input ArrayList in perBatchFn is the buffer of elements.

The DoFn should
map the set of inputs to a set of outputs of the same size and in the
same order as the input (or, possibly, an empty list would be
acceptable). Semantically, it should be defined as

public expand(PCollection input) {
   return input
 .apply(e -> SingletonList.of(e))
 .apply(parDo(batchDoFn))
 .apply(es -> Iterables.onlyElement(es));
}

Getting this correct wrt timestamps and windowing is tricky. However,
even something that handles the most trivial case (e.g. GlobalWindows
only) and degenerates to batch sizes of 1 for other cases would allow
people to start using this code (rather than rolling their own) and we
could then continue to refine it.

Yes sure, right now the code handles only the global window case. This
is the very beginning, I'm still in the simple naive approach (no window
and no buffering trans-bundle support), I plan to use state API to
buffer trans-bundle and timer API (as Kenn pointed) to detect the end of
the window in the DoFn.

Thanks for your comments Robert.

More responses inline below.

On Thu, Jan 26, 2017 at 1:48 AM, Etienne
Chauchot  wrote:

Hi,

I have started to implement this ticket. For now it is implemented as a
PTransform that simply does ParDo.of(new DoFn) and all the processing
related to batching is done in the DoFn.

I'm starting to deal with windows and bundles (starting to take a
look at
the State API to process trans-bundles, more questions about this to
come).
My comments/questions are inline:

Le 17/01/2017 à 18:41, Ben Chambers a écrit :

We should start by understanding the goals. If elements are in
different
windows can they be out in the same batch? If they have different
timestamps what timestamp should the batch have?

Regarding timestamps: currently design is as so: the transform does not
group elements in the PCollection, so the "batch" does not exist as an
element in the PCollection. There is only a user defined function
(perBatchFn) that gets called when batchSize elements have been
processed.
This function takes an ArrayList as parameter. So elements keep their
original timestamps

Correct, elements must keep their original timestamps. This is one
reason @OnWindowExpiration is insufficient. The watermark needs to he
held back to the timestamp of the earliest element in the buffer.


Regarding windowing: I guess that if elements are not in the same
window,
they are not expected to be in the same batch.

Batching should be possible across windows, as long as the
innerBatchDoFn does not take the Window (or window-dependent side
inputs) as parameters. Note in particular, if there is ever
non-trivial windowing, after a GBK each successive element is almost
certainly in a different window from its predecessor, which would make
emitting after each window change useless.


I'm just starting to work on these subjects, so I might lack a bit of
information;
what I am currently thinking about is that I need a way to know in
the DoFn
that the window has expired so that I can call the perBatchFn even if
batchSize is not reached.  This is the @OnWindowExpiration callback that
Kenneth mentioned in an email about bundles.
Lets imagine that we have a collection of elements artificially
timestamped
every 10 seconds (for simplicity of the example) and a fixed
windowing of 1
minute. Then each window contains 6 elements. If we were to buffer the
elements by batches of 5 

Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-27 Thread Robert Bradshaw
On Fri, Jan 27, 2017 at 6:55 AM, Etienne Chauchot  wrote:
> Hi Robert,
>
> Le 26/01/2017 à 18:17, Robert Bradshaw a écrit :
>>
>> First off, let me say that a *correctly* batching DoFn is a lot of
>> value, especially because it's (too) easy to (often unknowingly)
>> implement it incorrectly.
>
> I definitely agree, I put a similar comment in another email. As an example
> I recall a comment of someone in stackoverflow who said that he would have
> forgotten to flush the batch in finishBundle.
>>
>> My take is that a BatchingParDo should be a PTransform> PCollection> that takes a DoFn, ? extends
>> Iterable> as a parameter, as well as some (optional?) batching
>> criteria (probably batch size and/or batch timeout).
>
> This is how I implemented it plus another perElement function that produces
> an intermediary type to allow the user to use another type than InputType in
> perBatchFn (for ex convert elements to DTO  and to call external service in
> perBatchFn using DTOs) or to do any other per-element computation before
> adding elements to the batch.

I think we should omit the perElement as part of this transform as
that can be done immediately prior to this one without any loss of
generality or utility. One can always wrap this composition in a new
PTransform if desired.

> Besides I used SimpleFunctions
>
> SimpleFunction perElementFn;
> SimpleFunction perBatchFn;
>
> The input ArrayList in perBatchFn is the buffer of elements.

We should be as general as possible, e.g. SimpleFunction, ? extends Iterable>. Again, letting
this be a DoFn rather than SimpleFunction allows for things such as
setup, teardown, side inputs, etc. but forces complicated delegation
so this is probably a fine start.

>> The DoFn should
>> map the set of inputs to a set of outputs of the same size and in the
>> same order as the input (or, possibly, an empty list would be
>> acceptable). Semantically, it should be defined as
>>
>> public expand(PCollection input) {
>>return input
>>  .apply(e -> SingletonList.of(e))
>>  .apply(parDo(batchDoFn))
>>  .apply(es -> Iterables.onlyElement(es));
>> }
>>
>> Getting this correct wrt timestamps and windowing is tricky. However,
>> even something that handles the most trivial case (e.g. GlobalWindows
>> only) and degenerates to batch sizes of 1 for other cases would allow
>> people to start using this code (rather than rolling their own) and we
>> could then continue to refine it.
>
> Yes sure, right now the code handles only the global window case. This is
> the very beginning, I'm still in the simple naive approach (no window and no
> buffering trans-bundle support),

+1. We should assert on construction that the windowing is global.
Even in the global window case, we'll want to avoid mangling element
timestamps.

> I plan to use state API to buffer
> trans-bundle and timer API (as Kenn pointed) to detect the end of the window
> in the DoFn.

Makes sense. It'd be nice if we could figure out a way to do this
across keys (and windows, when the batch computation isn't sensitive
to this of course).

> Thanks for your comments Robert.

Glad to help. Thanks for taking this on.

- Robert


Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-27 Thread Etienne Chauchot

Hi,

Indeed, I did not want to be invasive in DoFn either. So I chose to 
implement it as a PTransform.


Please be aware that it is just the very beginning, it is still in the 
simple naive approach (no window and no buffering trans-bundle support), 
I plan to use state API to buffer trans-bundle and timer API (as Kenn 
pointed) to detect the end of the window in the DoFn.


Here is the branch, it is too early to show it but it could provide a 
base for discussions as Eugene said.


https://github.com/echauchot/beam/commits/BEAM-135-BATCHING-PARDO

take a look at BatchingParDo and BatchingParDoTest.

You will find a user snipet as pseudo code in the javadoc of BatchingParDo.

There is also client code in the test but it is not close to a use case, 
It just allows to test the inner DoFn.


Thanks guys.

Etienne


Le 27/01/2017 à 00:00, Robert Bradshaw a écrit :

On Thu, Jan 26, 2017 at 12:48 PM, Eugene Kirpichov
 wrote:

I don't think we should make batching a core feature of the Beam
programming model (by adding it to DoFn as this code snippet implies). I'm
reasonably sure there are less invasive ways of implementing it.

+1, either as a PTransform or a DoFn that
wraps/delegates to a DoFn.


On Thu, Jan 26, 2017 at 12:22 PM Jean-Baptiste Onofré 
wrote:


Agree, I'm curious as well.

I guess it would be something like:

.apply(ParDo(new DoFn() {

 @Override
 public long batchSize() {
   return 1000;
 }

 @ProcessElement
 public void processElement(ProcessContext context) {
   ...
 }
}));

If batchSize (overrided by user) returns a positive long, then DoFn can
batch with this size.

Regards
JB

On 01/26/2017 05:38 PM, Eugene Kirpichov wrote:

Hi Etienne,

Could you post some snippets of how your transform is to be used in a
pipeline? I think that would make it easier to discuss on this thread and
could save a lot of churn if the discussion ends up leading to a

different

API.

On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot 
wrote:


Wonderful !

Thanks Kenn !

Etienne


Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :

Hi Etienne,

I was drafting a proposal about @OnWindowExpiration when this email
arrived. I thought I would try to quickly unblock you by responding

with

a

TL;DR: you can achieve your goals with state & timers as they currently
exist. You'll set a timer for

window.maxTimestamp().plus(allowedLateness)

precisely - when this timer fires, you are guaranteed that the input
watermark has exceeded this point (so all new data is droppable) while

the

output timestamp is held to this point (so you can safely output into

the

window).

@OnWindowExpiration is (1) a convenience to save you from needing a

handle

on the allowed lateness (not a problem in your case) and (2) actually
meaningful and potentially less expensive to implement in the absence

of

state (this is why it needs a design discussion at all, really).

Caveat: these APIs are new and not supported in every runner and

windowing

configuration.

Kenn

On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot 

Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-27 Thread Etienne Chauchot

Hi Robert,

Le 26/01/2017 à 18:17, Robert Bradshaw a écrit :

First off, let me say that a *correctly* batching DoFn is a lot of
value, especially because it's (too) easy to (often unknowingly)
implement it incorrectly.
I definitely agree, I put a similar comment in another email. As an 
example I recall a comment of someone in stackoverflow who said that he 
would have forgotten to flush the batch in finishBundle.

My take is that a BatchingParDo should be a PTransform that takes a DoFn, ? extends
Iterable> as a parameter, as well as some (optional?) batching
criteria (probably batch size and/or batch timeout).
This is how I implemented it plus another perElement function that 
produces an intermediary type to allow the user to use another type than 
InputType in perBatchFn (for ex convert elements to DTO  and to call 
external service in perBatchFn using DTOs) or to do any other 
per-element computation before adding elements to the batch.


Besides I used SimpleFunctions

SimpleFunction perElementFn;
SimpleFunction perBatchFn;

The input ArrayList in perBatchFn is the buffer of elements.

The DoFn should
map the set of inputs to a set of outputs of the same size and in the
same order as the input (or, possibly, an empty list would be
acceptable). Semantically, it should be defined as

public expand(PCollection input) {
   return input
 .apply(e -> SingletonList.of(e))
 .apply(parDo(batchDoFn))
 .apply(es -> Iterables.onlyElement(es));
}

Getting this correct wrt timestamps and windowing is tricky. However,
even something that handles the most trivial case (e.g. GlobalWindows
only) and degenerates to batch sizes of 1 for other cases would allow
people to start using this code (rather than rolling their own) and we
could then continue to refine it.
Yes sure, right now the code handles only the global window case. This 
is the very beginning, I'm still in the simple naive approach (no window 
and no buffering trans-bundle support), I plan to use state API to 
buffer trans-bundle and timer API (as Kenn pointed) to detect the end of 
the window in the DoFn.


Thanks for your comments Robert.

More responses inline below.

On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot  wrote:

Hi,

I have started to implement this ticket. For now it is implemented as a
PTransform that simply does ParDo.of(new DoFn) and all the processing
related to batching is done in the DoFn.

I'm starting to deal with windows and bundles (starting to take a look at
the State API to process trans-bundles, more questions about this to come).
My comments/questions are inline:

Le 17/01/2017 à 18:41, Ben Chambers a écrit :

We should start by understanding the goals. If elements are in different
windows can they be out in the same batch? If they have different
timestamps what timestamp should the batch have?

Regarding timestamps: currently design is as so: the transform does not
group elements in the PCollection, so the "batch" does not exist as an
element in the PCollection. There is only a user defined function
(perBatchFn) that gets called when batchSize elements have been processed.
This function takes an ArrayList as parameter. So elements keep their
original timestamps

Correct, elements must keep their original timestamps. This is one
reason @OnWindowExpiration is insufficient. The watermark needs to he
held back to the timestamp of the earliest element in the buffer.


Regarding windowing: I guess that if elements are not in the same window,
they are not expected to be in the same batch.

Batching should be possible across windows, as long as the
innerBatchDoFn does not take the Window (or window-dependent side
inputs) as parameters. Note in particular, if there is ever
non-trivial windowing, after a GBK each successive element is almost
certainly in a different window from its predecessor, which would make
emitting after each window change useless.


I'm just starting to work on these subjects, so I might lack a bit of
information;
what I am currently thinking about is that I need a way to know in the DoFn
that the window has expired so that I can call the perBatchFn even if
batchSize is not reached.  This is the @OnWindowExpiration callback that
Kenneth mentioned in an email about bundles.
Lets imagine that we have a collection of elements artificially timestamped
every 10 seconds (for simplicity of the example) and a fixed windowing of 1
minute. Then each window contains 6 elements. If we were to buffer the
elements by batches of 5 elements, then for each window we expect to get 2
batches (one of 5 elements, one of 1 element). For that to append, we need a
@OnWindowExpiration on the DoFn where we call perBatchFn


As a composite transform this will likely require a group by key which may
affect performance. Maybe within a dofn is better.

Yes, the processing is done with a DoFn indeed.

However, without a GBK it is unclear 

Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Robert Bradshaw
On Thu, Jan 26, 2017 at 6:58 PM, Kenneth Knowles  
wrote:
> On Thu, Jan 26, 2017 at 4:15 PM, Robert Bradshaw <
> rober...@google.com.invalid> wrote:
>
>> On Thu, Jan 26, 2017 at 3:42 PM, Eugene Kirpichov
>>  wrote:
>> >
>> > you can't wrap DoFn's, period
>>
>> As a simple example, given a DoFn it's perfectly natural to want
>> to "wrap" this as a DoFn, KV>. State, side inputs,
>> windows, etc. would just be passed through.
>
>
>> The fact that this is complicated, with reflection and flexible
>> signatures and byte generation, is a property of the SDK (to provide a
>> flexible DoFn API). I agree that it's nice to hide this complexity
>> from the user, and it discourages this kind of composability.
>>
>
> 
> The difficulty of this sort of composability is a particularly bothersome
> issue for DoFn. It is solvable but the solutions may seem esoteric.
>
>  - Supporting wrapped _invocation_ is actually as easy as before if we
> chose to embrace it: ArgumentProvider is roughly the same thing as ye olde
> ProcessContext. We can easily provide it via our parameter mechanism, and
> DoFnInvokers can be used or we can also provide a DoFnInvoker for some
> requested DoFn.
>
>  - Supporting wrapped analysis is a bit uglier but I don't think there are
> technical blockers. It was actually quite bad prior to the new DoFn - you
> couldn't know statically whether the wrapper class had to "implements
> RequiresWindowAccess" so you would just have to do it "just in case". With
> the new DoFn I could imagine a `@Wraps public DoFn getWrapped()` though
> we'd still have to be able to merge this with whatever the wrapper requires
> - for example if they both use state there will be nothing calling out the
> fact that the wrapper needs to create a disjoint namespace. We could even
> make this required if there is an ArgumentProvider parameter and/or
> automatically provide a DoFnInvoker for this DoFn (so DoFnInvoker would be
> in the core SDK and DoFnInvokers would be only in the SDK Fn harness).
>
> So I think the analysis problems are fundamental, not part of the
> particulars of the new DoFn API or any particular SDK.
> 


My point is that the DoFn as a concept in the Beam model is
fundamentally, though not perfectly, compossible. Both invocation and
analysis are functions of the SDK, and solvable, though perhaps not
easily (and/or requiring what would normally be considered
implementation details).


> Coming back to the ticket... I'm going to echo Ben's early point, and now
> Eugene's and Robert's that we should enumerate further use cases explicitly
> and preferably add them to the JIRA.
>
> Both SO questions are answered with essentially an inlined PTransform Iterable> with a maximum iterable size, for batched RPCs downstream. You
> can easily build timestamped and reified-windows versions without that
> transform being aware of it. It is simple to understand, as easy to
> implement a unified-model version via state & timers as those SO answers,
> and doesn't require DoFn nesting.

I think Eugene surfaced the key point, which is it depends on what one
does with the output.

Both of these emit "result" in the timestamp and window of whatever
the last element of the batch was, regardless of the other elements.
Of course they'll both break at runtime (with high probability) for
inputs windowed by anything but timestamp-invariant WindowFns like
GlobalWindows as the emit in finalize won't have an ambient window or
timestamp to assign to its output.

This is fine for writes or other terminal nodes without output (though
even writes may have output).

> I would love to learn more about use
> cases that either debunk this or refine it.

Anytime one wants to consume the output in a non-globally-windowed,
non-uniform-timestamp way is broken with the SO answers above. In
particular, streaming. (Adapting these to use state would restrict
batches to be per-window, per-key, and still not respect timestamps.)

The usecases I gave are one follows the batch computation with an
"unbatch." Put another way, the batching allows amortization of
processing cost for logically independent elements. One would have to
do this by reifying windows and timestamps (holding back the timestamp
to the earliest member of the batch), storing elements in an
accumulator (per-bundle locally or cross-bundle using
(per-key-window(?)) state) processing the batch, and then restoring
windows and timestamp. However, the user of the API shouldn't have to
manually deal with the window-reified elements, i.e. they should
provide a List -> List DoFn rather than a List
-> List one which is why I think nesting, or at
least wrapping, is required.

Any outputs other than 1:0 or 1:1 requires more manual thought as to
what the window and timestamp of the output should be (or restriction
of batches to single windows/timestamps, or rounding up timestamps to
the end of 

Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Jean-Baptiste Onofré
⁣Hi Eugene

A simple way would be to create a BatchedDoFn in an extension.

WDYT ?

Regards
JB

On Jan 26, 2017, 21:48, at 21:48, Eugene Kirpichov 
 wrote:
>I don't think we should make batching a core feature of the Beam
>programming model (by adding it to DoFn as this code snippet implies).
>I'm
>reasonably sure there are less invasive ways of implementing it.
>On Thu, Jan 26, 2017 at 12:22 PM Jean-Baptiste Onofré 
>wrote:
>
>> Agree, I'm curious as well.
>>
>> I guess it would be something like:
>>
>> .apply(ParDo(new DoFn() {
>>
>> @Override
>> public long batchSize() {
>>   return 1000;
>> }
>>
>> @ProcessElement
>> public void processElement(ProcessContext context) {
>>   ...
>> }
>> }));
>>
>> If batchSize (overrided by user) returns a positive long, then DoFn
>can
>> batch with this size.
>>
>> Regards
>> JB
>>
>> On 01/26/2017 05:38 PM, Eugene Kirpichov wrote:
>> > Hi Etienne,
>> >
>> > Could you post some snippets of how your transform is to be used in
>a
>> > pipeline? I think that would make it easier to discuss on this
>thread and
>> > could save a lot of churn if the discussion ends up leading to a
>> different
>> > API.
>> >
>> > On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot
>
>> > wrote:
>> >
>> >> Wonderful !
>> >>
>> >> Thanks Kenn !
>> >>
>> >> Etienne
>> >>
>> >>
>> >> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :
>> >>> Hi Etienne,
>> >>>
>> >>> I was drafting a proposal about @OnWindowExpiration when this
>email
>> >>> arrived. I thought I would try to quickly unblock you by
>responding
>> with
>> >> a
>> >>> TL;DR: you can achieve your goals with state & timers as they
>currently
>> >>> exist. You'll set a timer for
>> window.maxTimestamp().plus(allowedLateness)
>> >>> precisely - when this timer fires, you are guaranteed that the
>input
>> >>> watermark has exceeded this point (so all new data is droppable)
>while
>> >> the
>> >>> output timestamp is held to this point (so you can safely output
>into
>> the
>> >>> window).
>> >>>
>> >>> @OnWindowExpiration is (1) a convenience to save you from needing
>a
>> >> handle
>> >>> on the allowed lateness (not a problem in your case) and (2)
>actually
>> >>> meaningful and potentially less expensive to implement in the
>absence
>> of
>> >>> state (this is why it needs a design discussion at all, really).
>> >>>
>> >>> Caveat: these APIs are new and not supported in every runner and
>> >> windowing
>> >>> configuration.
>> >>>
>> >>> Kenn
>> >>>
>> >>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot
>> >
>> >>> wrote:
>> >>>
>>  Hi,
>> 
>>  I have started to implement this ticket. For now it is
>implemented as
>> a
>>  PTransform that simply does ParDo.of(new DoFn) and all the
>processing
>>  related to batching is done in the DoFn.
>> 
>>  I'm starting to deal with windows and bundles (starting to take
>a look
>> >> at
>>  the State API to process trans-bundles, more questions about
>this to
>> >> come).
>>  My comments/questions are inline:
>> 
>> 
>>  Le 17/01/2017 à 18:41, Ben Chambers a écrit :
>> 
>> > We should start by understanding the goals. If elements are in
>> >> different
>> > windows can they be out in the same batch? If they have
>different
>> > timestamps what timestamp should the batch have?
>> >
>>  Regarding timestamps: currently design is as so: the transform
>does
>> not
>>  group elements in the PCollection, so the "batch" does not exist
>as an
>>  element in the PCollection. There is only a user defined
>function
>>  (perBatchFn) that gets called when batchSize elements have been
>> >> processed.
>>  This function takes an ArrayList as parameter. So elements keep
>their
>>  original timestamps
>> 
>> 
>>  Regarding windowing: I guess that if elements are not in the
>same
>> >> window,
>>  they are not expected to be in the same batch.
>>  I'm just starting to work on these subjects, so I might lack a
>bit of
>>  information;
>>  what I am currently thinking about is that I need a way to know
>in the
>>  DoFn that the window has expired so that I can call the
>perBatchFn
>> even
>> >> if
>>  batchSize is not reached.  This is the @OnWindowExpiration
>callback
>> that
>>  Kenneth mentioned in an email about bundles.
>>  Lets imagine that we have a collection of elements artificially
>>  timestamped every 10 seconds (for simplicity of the example) and
>a
>> fixed
>>  windowing of 1 minute. Then each window contains 6 elements. If
>we
>> were
>> >> to
>>  buffer the elements by batches of 5 elements, then for each
>window we
>>  expect to get 2 batches (one of 5 elements, one of 1 element).
>For
>> that
>> >> to
>>  append, we need a @OnWindowExpiration on the DoFn where we call
>> >> perBatchFn
>> 
>>  As a composite transform 

Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Kenneth Knowles
On Thu, Jan 26, 2017 at 4:15 PM, Robert Bradshaw <
rober...@google.com.invalid> wrote:

> On Thu, Jan 26, 2017 at 3:42 PM, Eugene Kirpichov
>  wrote:
> >
> > you can't wrap DoFn's, period
>
> As a simple example, given a DoFn it's perfectly natural to want
> to "wrap" this as a DoFn, KV>. State, side inputs,
> windows, etc. would just be passed through.


> The fact that this is complicated, with reflection and flexible
> signatures and byte generation, is a property of the SDK (to provide a
> flexible DoFn API). I agree that it's nice to hide this complexity
> from the user, and it discourages this kind of composability.
>


The difficulty of this sort of composability is a particularly bothersome
issue for DoFn. It is solvable but the solutions may seem esoteric.

 - Supporting wrapped _invocation_ is actually as easy as before if we
chose to embrace it: ArgumentProvider is roughly the same thing as ye olde
ProcessContext. We can easily provide it via our parameter mechanism, and
DoFnInvokers can be used or we can also provide a DoFnInvoker for some
requested DoFn.

 - Supporting wrapped analysis is a bit uglier but I don't think there are
technical blockers. It was actually quite bad prior to the new DoFn - you
couldn't know statically whether the wrapper class had to "implements
RequiresWindowAccess" so you would just have to do it "just in case". With
the new DoFn I could imagine a `@Wraps public DoFn getWrapped()` though
we'd still have to be able to merge this with whatever the wrapper requires
- for example if they both use state there will be nothing calling out the
fact that the wrapper needs to create a disjoint namespace. We could even
make this required if there is an ArgumentProvider parameter and/or
automatically provide a DoFnInvoker for this DoFn (so DoFnInvoker would be
in the core SDK and DoFnInvokers would be only in the SDK Fn harness).

So I think the analysis problems are fundamental, not part of the
particulars of the new DoFn API or any particular SDK.


Coming back to the ticket... I'm going to echo Ben's early point, and now
Eugene's and Robert's that we should enumerate further use cases explicitly
and preferably add them to the JIRA.

Both SO questions are answered with essentially an inlined PTransform with a maximum iterable size, for batched RPCs downstream. You
can easily build timestamped and reified-windows versions without that
transform being aware of it. It is simple to understand, as easy to
implement a unified-model version via state & timers as those SO answers,
and doesn't require DoFn nesting. I would love to learn more about use
cases that either debunk this or refine it. Also one transform does not
need to serve all uses; we just need it to serve its intended use properly
and try not to tempt misuse.

Focusing briefly on the windowing issues, the outside world is globally
windowed. So in those communications the data is in the global window
whether you want it to be or not. IMO with rare expceptions rewindowing to
the global window silently (such as by making an RPC ignoring the window)
is data loss (or maybe just well-documented data discarding :-). So
window-aware write transforms (with special case of the global window being
"already ready") are a good idea from the start. It probably makes sense to
have consistently windowed output from a window-aware write, so the above
transform should operate per-window or else reify windows, batch in the
global window, then restore windows (requires BEAM-1287 or a WindowFn).

Kenn


Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Robert Bradshaw
On Thu, Jan 26, 2017 at 4:20 PM, Ben Chambers
 wrote:
> Here's an example API that would make this part of a DoFn. The idea here is
> that it would still be run as `ParDo.of(new MyBatchedDoFn())`, but the
> runner (and DoFnRunner) could see that it has asked for batches, so rather
> than calling a `processElement` on every input `I`, it assembles a
> `Collection` and then calls the method.
>
> Possible API making this part of DoFn (with a fixed size):
>
> public MyBatchedDoFn extends DoFn {
>   @ProcessBatch(size = 50)
>   public void processBatch(ProcessContext c) {
> Collection batchContents = c.element();
> ...
>   }
> }
>
> Possible API making this part of DoFn (with dynamic size):
>
> public MyBatchedDoFn extends DoFn {
>   @ProcessBatch
>   public boolean processBatch(ProcessContext c) {
> Collection batchContents = c.element();
> if (batchContents.size() < 50) {
>   return false; // batch not yet processed
> }
>
> ...
> return true;
>   }
> }

Or even

public MyBatchedDoFn extends DoFn {
  public void processElement(Iterable batch) {
[process the batch]
  }
}

though I'd rather this not be baked into the DoFn API if it can be
solved separately.


Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Ben Chambers
The third option for batching:

- Functionality within the DoFn and DoFnRunner built as part of the SDK.

I haven't thought through Batching, but at least for the
IntraBundleParallelization use case this actually does make sense to expose
as a part of the model. Knowing that a DoFn supports parallelization, a
runner may want to control how much parallelization is allowed, and the
DoFn also needs to make sure to wait on all those threads (and make sure
they're properly setup for logging/metrics/etc. associated with the current
step).

There may be good reasons to make this a property of a DoFn that the runner
can inspect, and support. For instance, if a DoFn wants to process batches
of 50, it may be possible to factor that into how input is split/bundled.

On Thu, Jan 26, 2017 at 3:49 PM Kenneth Knowles 
wrote:

> On Thu, Jan 26, 2017 at 3:42 PM, Eugene Kirpichov <
> kirpic...@google.com.invalid> wrote:
>
> > The class for invoking DoFn's,
> > DoFnInvokers, is absent from the SDK (and present in runners-core) for a
> > good reason.
> >
>
> This would be true if it weren't for that pesky DoFnTester :-)
>
> And even if we solve that problem, in the future it will be in the SDK's Fn
> Harness.
>


Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Kenneth Knowles
On Thu, Jan 26, 2017 at 3:42 PM, Eugene Kirpichov <
kirpic...@google.com.invalid> wrote:

> The class for invoking DoFn's,
> DoFnInvokers, is absent from the SDK (and present in runners-core) for a
> good reason.
>

This would be true if it weren't for that pesky DoFnTester :-)

And even if we solve that problem, in the future it will be in the SDK's Fn
Harness.


Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Eugene Kirpichov
I agree that wrapping the DoFn is probably not the way to go, because the
DoFn may be quite tricky due to all the reflective features: e.g. how do
you automatically "batch" a DoFn that uses state and timers? What about a
DoFn that uses a BoundedWindow parameter? What about a splittable DoFn?
What about future reflective features? The class for invoking DoFn's,
DoFnInvokers, is absent from the SDK (and present in runners-core) for a
good reason.

I'd rather leave the intricacies of invoking DoFn's to runners, and say
that you can't wrap DoFn's, period - "adapter", "decorator" and other
design patterns just don't apply to DoFn's.

The two options for batching are:
- A transform that takes elements and produces batches, like Robert said
- A simple Beam-agnostic library that takes Java objects and produces
batches of Java objects, with an API that makes it convenient to use in a
typical batching DoFn

On Thu, Jan 26, 2017 at 3:31 PM Ben Chambers 
wrote:

> I think that wrapping the DoFn is tricky -- we backed out
> IntraBundleParallelization because it did that, and it has weird
> interactions with both the reflective DoFn and windowing. We could maybe
> make some kind of "DoFnDelegatingDoFn" that could act as a base class and
> get some of that right, but...
>
> One question I have is whether this batching should be "make batches of N
> and if you need to wait for the Nth element do so" or "make batches of at
> most N but don't wait too long if you don't get to N". In the former case,
> we'll need to do something to buffer elements between bundles -- whether
> this is using State or a GroupByKey, etc. In the latter case, the buffering
> can happen entirely within a bundle -- if you get to the end of the bundle
> and only have 5 elements, even if 5 < N, process that as a batch (rather
> than shifting it somewhere else).
>
> On Thu, Jan 26, 2017 at 3:01 PM Robert Bradshaw
> 
> wrote:
>
> > On Thu, Jan 26, 2017 at 12:48 PM, Eugene Kirpichov
> >  wrote:
> > > I don't think we should make batching a core feature of the Beam
> > > programming model (by adding it to DoFn as this code snippet implies).
> > I'm
> > > reasonably sure there are less invasive ways of implementing it.
> >
> > +1, either as a PTransform or a DoFn that
> > wraps/delegates to a DoFn.
> >
> > > On Thu, Jan 26, 2017 at 12:22 PM Jean-Baptiste Onofré  >
> > > wrote:
> > >
> > >> Agree, I'm curious as well.
> > >>
> > >> I guess it would be something like:
> > >>
> > >> .apply(ParDo(new DoFn() {
> > >>
> > >> @Override
> > >> public long batchSize() {
> > >>   return 1000;
> > >> }
> > >>
> > >> @ProcessElement
> > >> public void processElement(ProcessContext context) {
> > >>   ...
> > >> }
> > >> }));
> > >>
> > >> If batchSize (overrided by user) returns a positive long, then DoFn
> can
> > >> batch with this size.
> > >>
> > >> Regards
> > >> JB
> > >>
> > >> On 01/26/2017 05:38 PM, Eugene Kirpichov wrote:
> > >> > Hi Etienne,
> > >> >
> > >> > Could you post some snippets of how your transform is to be used in
> a
> > >> > pipeline? I think that would make it easier to discuss on this
> thread
> > and
> > >> > could save a lot of churn if the discussion ends up leading to a
> > >> different
> > >> > API.
> > >> >
> > >> > On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot <
> echauc...@gmail.com
> > >
> > >> > wrote:
> > >> >
> > >> >> Wonderful !
> > >> >>
> > >> >> Thanks Kenn !
> > >> >>
> > >> >> Etienne
> > >> >>
> > >> >>
> > >> >> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :
> > >> >>> Hi Etienne,
> > >> >>>
> > >> >>> I was drafting a proposal about @OnWindowExpiration when this
> email
> > >> >>> arrived. I thought I would try to quickly unblock you by
> responding
> > >> with
> > >> >> a
> > >> >>> TL;DR: you can achieve your goals with state & timers as they
> > currently
> > >> >>> exist. You'll set a timer for
> > >> window.maxTimestamp().plus(allowedLateness)
> > >> >>> precisely - when this timer fires, you are guaranteed that the
> input
> > >> >>> watermark has exceeded this point (so all new data is droppable)
> > while
> > >> >> the
> > >> >>> output timestamp is held to this point (so you can safely output
> > into
> > >> the
> > >> >>> window).
> > >> >>>
> > >> >>> @OnWindowExpiration is (1) a convenience to save you from needing
> a
> > >> >> handle
> > >> >>> on the allowed lateness (not a problem in your case) and (2)
> > actually
> > >> >>> meaningful and potentially less expensive to implement in the
> > absence
> > >> of
> > >> >>> state (this is why it needs a design discussion at all, really).
> > >> >>>
> > >> >>> Caveat: these APIs are new and not supported in every runner and
> > >> >> windowing
> > >> >>> configuration.
> > >> >>>
> > >> >>> Kenn
> > >> >>>
> > >> >>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot <
> > 

Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Robert Bradshaw
On Thu, Jan 26, 2017 at 3:31 PM, Ben Chambers
 wrote:
> I think that wrapping the DoFn is tricky -- we backed out
> IntraBundleParallelization because it did that, and it has weird
> interactions with both the reflective DoFn and windowing. We could maybe
> make some kind of "DoFnDelegatingDoFn" that could act as a base class and
> get some of that right, but...

Yeah, this is a lot trickier with NewDoFn. Which is unfortunate as
this isn't the only case where we want to make DoFns more compossible.

> One question I have is whether this batching should be "make batches of N
> and if you need to wait for the Nth element do so" or "make batches of at
> most N but don't wait too long if you don't get to N". In the former case,
> we'll need to do something to buffer elements between bundles -- whether
> this is using State or a GroupByKey, etc. In the latter case, the buffering
> can happen entirely within a bundle -- if you get to the end of the bundle
> and only have 5 elements, even if 5 < N, process that as a batch (rather
> than shifting it somewhere else).

I think the "make batches of at most N but don't wait too long if you
don't get to N" is a very useful first (and tractable) start that can
be built on.

> On Thu, Jan 26, 2017 at 3:01 PM Robert Bradshaw 
> wrote:
>
>> On Thu, Jan 26, 2017 at 12:48 PM, Eugene Kirpichov
>>  wrote:
>> > I don't think we should make batching a core feature of the Beam
>> > programming model (by adding it to DoFn as this code snippet implies).
>> I'm
>> > reasonably sure there are less invasive ways of implementing it.
>>
>> +1, either as a PTransform or a DoFn that
>> wraps/delegates to a DoFn.
>>
>> > On Thu, Jan 26, 2017 at 12:22 PM Jean-Baptiste Onofré 
>> > wrote:
>> >
>> >> Agree, I'm curious as well.
>> >>
>> >> I guess it would be something like:
>> >>
>> >> .apply(ParDo(new DoFn() {
>> >>
>> >> @Override
>> >> public long batchSize() {
>> >>   return 1000;
>> >> }
>> >>
>> >> @ProcessElement
>> >> public void processElement(ProcessContext context) {
>> >>   ...
>> >> }
>> >> }));
>> >>
>> >> If batchSize (overrided by user) returns a positive long, then DoFn can
>> >> batch with this size.
>> >>
>> >> Regards
>> >> JB
>> >>
>> >> On 01/26/2017 05:38 PM, Eugene Kirpichov wrote:
>> >> > Hi Etienne,
>> >> >
>> >> > Could you post some snippets of how your transform is to be used in a
>> >> > pipeline? I think that would make it easier to discuss on this thread
>> and
>> >> > could save a lot of churn if the discussion ends up leading to a
>> >> different
>> >> > API.
>> >> >
>> >> > On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot > >
>> >> > wrote:
>> >> >
>> >> >> Wonderful !
>> >> >>
>> >> >> Thanks Kenn !
>> >> >>
>> >> >> Etienne
>> >> >>
>> >> >>
>> >> >> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :
>> >> >>> Hi Etienne,
>> >> >>>
>> >> >>> I was drafting a proposal about @OnWindowExpiration when this email
>> >> >>> arrived. I thought I would try to quickly unblock you by responding
>> >> with
>> >> >> a
>> >> >>> TL;DR: you can achieve your goals with state & timers as they
>> currently
>> >> >>> exist. You'll set a timer for
>> >> window.maxTimestamp().plus(allowedLateness)
>> >> >>> precisely - when this timer fires, you are guaranteed that the input
>> >> >>> watermark has exceeded this point (so all new data is droppable)
>> while
>> >> >> the
>> >> >>> output timestamp is held to this point (so you can safely output
>> into
>> >> the
>> >> >>> window).
>> >> >>>
>> >> >>> @OnWindowExpiration is (1) a convenience to save you from needing a
>> >> >> handle
>> >> >>> on the allowed lateness (not a problem in your case) and (2)
>> actually
>> >> >>> meaningful and potentially less expensive to implement in the
>> absence
>> >> of
>> >> >>> state (this is why it needs a design discussion at all, really).
>> >> >>>
>> >> >>> Caveat: these APIs are new and not supported in every runner and
>> >> >> windowing
>> >> >>> configuration.
>> >> >>>
>> >> >>> Kenn
>> >> >>>
>> >> >>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot <
>> echauc...@gmail.com
>> >> >
>> >> >>> wrote:
>> >> >>>
>> >>  Hi,
>> >> 
>> >>  I have started to implement this ticket. For now it is implemented
>> as
>> >> a
>> >>  PTransform that simply does ParDo.of(new DoFn) and all the
>> processing
>> >>  related to batching is done in the DoFn.
>> >> 
>> >>  I'm starting to deal with windows and bundles (starting to take a
>> look
>> >> >> at
>> >>  the State API to process trans-bundles, more questions about this
>> to
>> >> >> come).
>> >>  My comments/questions are inline:
>> >> 
>> >> 
>> >>  Le 17/01/2017 à 18:41, Ben Chambers a écrit :
>> >> 
>> >> > We should start by understanding the goals. If elements are in
>> >> >> different

Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Ben Chambers
I think that wrapping the DoFn is tricky -- we backed out
IntraBundleParallelization because it did that, and it has weird
interactions with both the reflective DoFn and windowing. We could maybe
make some kind of "DoFnDelegatingDoFn" that could act as a base class and
get some of that right, but...

One question I have is whether this batching should be "make batches of N
and if you need to wait for the Nth element do so" or "make batches of at
most N but don't wait too long if you don't get to N". In the former case,
we'll need to do something to buffer elements between bundles -- whether
this is using State or a GroupByKey, etc. In the latter case, the buffering
can happen entirely within a bundle -- if you get to the end of the bundle
and only have 5 elements, even if 5 < N, process that as a batch (rather
than shifting it somewhere else).

On Thu, Jan 26, 2017 at 3:01 PM Robert Bradshaw 
wrote:

> On Thu, Jan 26, 2017 at 12:48 PM, Eugene Kirpichov
>  wrote:
> > I don't think we should make batching a core feature of the Beam
> > programming model (by adding it to DoFn as this code snippet implies).
> I'm
> > reasonably sure there are less invasive ways of implementing it.
>
> +1, either as a PTransform or a DoFn that
> wraps/delegates to a DoFn.
>
> > On Thu, Jan 26, 2017 at 12:22 PM Jean-Baptiste Onofré 
> > wrote:
> >
> >> Agree, I'm curious as well.
> >>
> >> I guess it would be something like:
> >>
> >> .apply(ParDo(new DoFn() {
> >>
> >> @Override
> >> public long batchSize() {
> >>   return 1000;
> >> }
> >>
> >> @ProcessElement
> >> public void processElement(ProcessContext context) {
> >>   ...
> >> }
> >> }));
> >>
> >> If batchSize (overrided by user) returns a positive long, then DoFn can
> >> batch with this size.
> >>
> >> Regards
> >> JB
> >>
> >> On 01/26/2017 05:38 PM, Eugene Kirpichov wrote:
> >> > Hi Etienne,
> >> >
> >> > Could you post some snippets of how your transform is to be used in a
> >> > pipeline? I think that would make it easier to discuss on this thread
> and
> >> > could save a lot of churn if the discussion ends up leading to a
> >> different
> >> > API.
> >> >
> >> > On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot  >
> >> > wrote:
> >> >
> >> >> Wonderful !
> >> >>
> >> >> Thanks Kenn !
> >> >>
> >> >> Etienne
> >> >>
> >> >>
> >> >> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :
> >> >>> Hi Etienne,
> >> >>>
> >> >>> I was drafting a proposal about @OnWindowExpiration when this email
> >> >>> arrived. I thought I would try to quickly unblock you by responding
> >> with
> >> >> a
> >> >>> TL;DR: you can achieve your goals with state & timers as they
> currently
> >> >>> exist. You'll set a timer for
> >> window.maxTimestamp().plus(allowedLateness)
> >> >>> precisely - when this timer fires, you are guaranteed that the input
> >> >>> watermark has exceeded this point (so all new data is droppable)
> while
> >> >> the
> >> >>> output timestamp is held to this point (so you can safely output
> into
> >> the
> >> >>> window).
> >> >>>
> >> >>> @OnWindowExpiration is (1) a convenience to save you from needing a
> >> >> handle
> >> >>> on the allowed lateness (not a problem in your case) and (2)
> actually
> >> >>> meaningful and potentially less expensive to implement in the
> absence
> >> of
> >> >>> state (this is why it needs a design discussion at all, really).
> >> >>>
> >> >>> Caveat: these APIs are new and not supported in every runner and
> >> >> windowing
> >> >>> configuration.
> >> >>>
> >> >>> Kenn
> >> >>>
> >> >>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot <
> echauc...@gmail.com
> >> >
> >> >>> wrote:
> >> >>>
> >>  Hi,
> >> 
> >>  I have started to implement this ticket. For now it is implemented
> as
> >> a
> >>  PTransform that simply does ParDo.of(new DoFn) and all the
> processing
> >>  related to batching is done in the DoFn.
> >> 
> >>  I'm starting to deal with windows and bundles (starting to take a
> look
> >> >> at
> >>  the State API to process trans-bundles, more questions about this
> to
> >> >> come).
> >>  My comments/questions are inline:
> >> 
> >> 
> >>  Le 17/01/2017 à 18:41, Ben Chambers a écrit :
> >> 
> >> > We should start by understanding the goals. If elements are in
> >> >> different
> >> > windows can they be out in the same batch? If they have different
> >> > timestamps what timestamp should the batch have?
> >> >
> >>  Regarding timestamps: currently design is as so: the transform does
> >> not
> >>  group elements in the PCollection, so the "batch" does not exist
> as an
> >>  element in the PCollection. There is only a user defined function
> >>  (perBatchFn) that gets called when batchSize elements have been
> >> >> processed.
> >>  This function takes an 

Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Robert Bradshaw
On Thu, Jan 26, 2017 at 12:48 PM, Eugene Kirpichov
 wrote:
> I don't think we should make batching a core feature of the Beam
> programming model (by adding it to DoFn as this code snippet implies). I'm
> reasonably sure there are less invasive ways of implementing it.

+1, either as a PTransform or a DoFn that
wraps/delegates to a DoFn.

> On Thu, Jan 26, 2017 at 12:22 PM Jean-Baptiste Onofré 
> wrote:
>
>> Agree, I'm curious as well.
>>
>> I guess it would be something like:
>>
>> .apply(ParDo(new DoFn() {
>>
>> @Override
>> public long batchSize() {
>>   return 1000;
>> }
>>
>> @ProcessElement
>> public void processElement(ProcessContext context) {
>>   ...
>> }
>> }));
>>
>> If batchSize (overrided by user) returns a positive long, then DoFn can
>> batch with this size.
>>
>> Regards
>> JB
>>
>> On 01/26/2017 05:38 PM, Eugene Kirpichov wrote:
>> > Hi Etienne,
>> >
>> > Could you post some snippets of how your transform is to be used in a
>> > pipeline? I think that would make it easier to discuss on this thread and
>> > could save a lot of churn if the discussion ends up leading to a
>> different
>> > API.
>> >
>> > On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot 
>> > wrote:
>> >
>> >> Wonderful !
>> >>
>> >> Thanks Kenn !
>> >>
>> >> Etienne
>> >>
>> >>
>> >> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :
>> >>> Hi Etienne,
>> >>>
>> >>> I was drafting a proposal about @OnWindowExpiration when this email
>> >>> arrived. I thought I would try to quickly unblock you by responding
>> with
>> >> a
>> >>> TL;DR: you can achieve your goals with state & timers as they currently
>> >>> exist. You'll set a timer for
>> window.maxTimestamp().plus(allowedLateness)
>> >>> precisely - when this timer fires, you are guaranteed that the input
>> >>> watermark has exceeded this point (so all new data is droppable) while
>> >> the
>> >>> output timestamp is held to this point (so you can safely output into
>> the
>> >>> window).
>> >>>
>> >>> @OnWindowExpiration is (1) a convenience to save you from needing a
>> >> handle
>> >>> on the allowed lateness (not a problem in your case) and (2) actually
>> >>> meaningful and potentially less expensive to implement in the absence
>> of
>> >>> state (this is why it needs a design discussion at all, really).
>> >>>
>> >>> Caveat: these APIs are new and not supported in every runner and
>> >> windowing
>> >>> configuration.
>> >>>
>> >>> Kenn
>> >>>
>> >>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot > >
>> >>> wrote:
>> >>>
>>  Hi,
>> 
>>  I have started to implement this ticket. For now it is implemented as
>> a
>>  PTransform that simply does ParDo.of(new DoFn) and all the processing
>>  related to batching is done in the DoFn.
>> 
>>  I'm starting to deal with windows and bundles (starting to take a look
>> >> at
>>  the State API to process trans-bundles, more questions about this to
>> >> come).
>>  My comments/questions are inline:
>> 
>> 
>>  Le 17/01/2017 à 18:41, Ben Chambers a écrit :
>> 
>> > We should start by understanding the goals. If elements are in
>> >> different
>> > windows can they be out in the same batch? If they have different
>> > timestamps what timestamp should the batch have?
>> >
>>  Regarding timestamps: currently design is as so: the transform does
>> not
>>  group elements in the PCollection, so the "batch" does not exist as an
>>  element in the PCollection. There is only a user defined function
>>  (perBatchFn) that gets called when batchSize elements have been
>> >> processed.
>>  This function takes an ArrayList as parameter. So elements keep their
>>  original timestamps
>> 
>> 
>>  Regarding windowing: I guess that if elements are not in the same
>> >> window,
>>  they are not expected to be in the same batch.
>>  I'm just starting to work on these subjects, so I might lack a bit of
>>  information;
>>  what I am currently thinking about is that I need a way to know in the
>>  DoFn that the window has expired so that I can call the perBatchFn
>> even
>> >> if
>>  batchSize is not reached.  This is the @OnWindowExpiration callback
>> that
>>  Kenneth mentioned in an email about bundles.
>>  Lets imagine that we have a collection of elements artificially
>>  timestamped every 10 seconds (for simplicity of the example) and a
>> fixed
>>  windowing of 1 minute. Then each window contains 6 elements. If we
>> were
>> >> to
>>  buffer the elements by batches of 5 elements, then for each window we
>>  expect to get 2 batches (one of 5 elements, one of 1 element). For
>> that
>> >> to
>>  append, we need a @OnWindowExpiration on the DoFn where we call
>> >> perBatchFn
>> 
>>  As a composite transform this will likely 

Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Eugene Kirpichov
I don't think we should make batching a core feature of the Beam
programming model (by adding it to DoFn as this code snippet implies). I'm
reasonably sure there are less invasive ways of implementing it.
On Thu, Jan 26, 2017 at 12:22 PM Jean-Baptiste Onofré 
wrote:

> Agree, I'm curious as well.
>
> I guess it would be something like:
>
> .apply(ParDo(new DoFn() {
>
> @Override
> public long batchSize() {
>   return 1000;
> }
>
> @ProcessElement
> public void processElement(ProcessContext context) {
>   ...
> }
> }));
>
> If batchSize (overrided by user) returns a positive long, then DoFn can
> batch with this size.
>
> Regards
> JB
>
> On 01/26/2017 05:38 PM, Eugene Kirpichov wrote:
> > Hi Etienne,
> >
> > Could you post some snippets of how your transform is to be used in a
> > pipeline? I think that would make it easier to discuss on this thread and
> > could save a lot of churn if the discussion ends up leading to a
> different
> > API.
> >
> > On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot 
> > wrote:
> >
> >> Wonderful !
> >>
> >> Thanks Kenn !
> >>
> >> Etienne
> >>
> >>
> >> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :
> >>> Hi Etienne,
> >>>
> >>> I was drafting a proposal about @OnWindowExpiration when this email
> >>> arrived. I thought I would try to quickly unblock you by responding
> with
> >> a
> >>> TL;DR: you can achieve your goals with state & timers as they currently
> >>> exist. You'll set a timer for
> window.maxTimestamp().plus(allowedLateness)
> >>> precisely - when this timer fires, you are guaranteed that the input
> >>> watermark has exceeded this point (so all new data is droppable) while
> >> the
> >>> output timestamp is held to this point (so you can safely output into
> the
> >>> window).
> >>>
> >>> @OnWindowExpiration is (1) a convenience to save you from needing a
> >> handle
> >>> on the allowed lateness (not a problem in your case) and (2) actually
> >>> meaningful and potentially less expensive to implement in the absence
> of
> >>> state (this is why it needs a design discussion at all, really).
> >>>
> >>> Caveat: these APIs are new and not supported in every runner and
> >> windowing
> >>> configuration.
> >>>
> >>> Kenn
> >>>
> >>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot  >
> >>> wrote:
> >>>
>  Hi,
> 
>  I have started to implement this ticket. For now it is implemented as
> a
>  PTransform that simply does ParDo.of(new DoFn) and all the processing
>  related to batching is done in the DoFn.
> 
>  I'm starting to deal with windows and bundles (starting to take a look
> >> at
>  the State API to process trans-bundles, more questions about this to
> >> come).
>  My comments/questions are inline:
> 
> 
>  Le 17/01/2017 à 18:41, Ben Chambers a écrit :
> 
> > We should start by understanding the goals. If elements are in
> >> different
> > windows can they be out in the same batch? If they have different
> > timestamps what timestamp should the batch have?
> >
>  Regarding timestamps: currently design is as so: the transform does
> not
>  group elements in the PCollection, so the "batch" does not exist as an
>  element in the PCollection. There is only a user defined function
>  (perBatchFn) that gets called when batchSize elements have been
> >> processed.
>  This function takes an ArrayList as parameter. So elements keep their
>  original timestamps
> 
> 
>  Regarding windowing: I guess that if elements are not in the same
> >> window,
>  they are not expected to be in the same batch.
>  I'm just starting to work on these subjects, so I might lack a bit of
>  information;
>  what I am currently thinking about is that I need a way to know in the
>  DoFn that the window has expired so that I can call the perBatchFn
> even
> >> if
>  batchSize is not reached.  This is the @OnWindowExpiration callback
> that
>  Kenneth mentioned in an email about bundles.
>  Lets imagine that we have a collection of elements artificially
>  timestamped every 10 seconds (for simplicity of the example) and a
> fixed
>  windowing of 1 minute. Then each window contains 6 elements. If we
> were
> >> to
>  buffer the elements by batches of 5 elements, then for each window we
>  expect to get 2 batches (one of 5 elements, one of 1 element). For
> that
> >> to
>  append, we need a @OnWindowExpiration on the DoFn where we call
> >> perBatchFn
> 
>  As a composite transform this will likely require a group by key which
> >> may
> > affect performance. Maybe within a dofn is better.
> >
>  Yes, the processing is done with a DoFn indeed.
> 
> > Then it could be some annotation or API that informs the runner.
> Should
> > batch sizes be fixed in the annotation (element count or size) or
> >> should
> > 

Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Jean-Baptiste Onofré

Agree, I'm curious as well.

I guess it would be something like:

.apply(ParDo(new DoFn() {

   @Override
   public long batchSize() {
 return 1000;
   }

   @ProcessElement
   public void processElement(ProcessContext context) {
 ...
   }
}));

If batchSize (overrided by user) returns a positive long, then DoFn can 
batch with this size.


Regards
JB

On 01/26/2017 05:38 PM, Eugene Kirpichov wrote:

Hi Etienne,

Could you post some snippets of how your transform is to be used in a
pipeline? I think that would make it easier to discuss on this thread and
could save a lot of churn if the discussion ends up leading to a different
API.

On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot 
wrote:


Wonderful !

Thanks Kenn !

Etienne


Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :

Hi Etienne,

I was drafting a proposal about @OnWindowExpiration when this email
arrived. I thought I would try to quickly unblock you by responding with

a

TL;DR: you can achieve your goals with state & timers as they currently
exist. You'll set a timer for window.maxTimestamp().plus(allowedLateness)
precisely - when this timer fires, you are guaranteed that the input
watermark has exceeded this point (so all new data is droppable) while

the

output timestamp is held to this point (so you can safely output into the
window).

@OnWindowExpiration is (1) a convenience to save you from needing a

handle

on the allowed lateness (not a problem in your case) and (2) actually
meaningful and potentially less expensive to implement in the absence of
state (this is why it needs a design discussion at all, really).

Caveat: these APIs are new and not supported in every runner and

windowing

configuration.

Kenn

On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot 
wrote:


Hi,

I have started to implement this ticket. For now it is implemented as a
PTransform that simply does ParDo.of(new DoFn) and all the processing
related to batching is done in the DoFn.

I'm starting to deal with windows and bundles (starting to take a look

at

the State API to process trans-bundles, more questions about this to

come).

My comments/questions are inline:


Le 17/01/2017 à 18:41, Ben Chambers a écrit :


We should start by understanding the goals. If elements are in

different

windows can they be out in the same batch? If they have different
timestamps what timestamp should the batch have?


Regarding timestamps: currently design is as so: the transform does not
group elements in the PCollection, so the "batch" does not exist as an
element in the PCollection. There is only a user defined function
(perBatchFn) that gets called when batchSize elements have been

processed.

This function takes an ArrayList as parameter. So elements keep their
original timestamps


Regarding windowing: I guess that if elements are not in the same

window,

they are not expected to be in the same batch.
I'm just starting to work on these subjects, so I might lack a bit of
information;
what I am currently thinking about is that I need a way to know in the
DoFn that the window has expired so that I can call the perBatchFn even

if

batchSize is not reached.  This is the @OnWindowExpiration callback that
Kenneth mentioned in an email about bundles.
Lets imagine that we have a collection of elements artificially
timestamped every 10 seconds (for simplicity of the example) and a fixed
windowing of 1 minute. Then each window contains 6 elements. If we were

to

buffer the elements by batches of 5 elements, then for each window we
expect to get 2 batches (one of 5 elements, one of 1 element). For that

to

append, we need a @OnWindowExpiration on the DoFn where we call

perBatchFn


As a composite transform this will likely require a group by key which

may

affect performance. Maybe within a dofn is better.


Yes, the processing is done with a DoFn indeed.


Then it could be some annotation or API that informs the runner. Should
batch sizes be fixed in the annotation (element count or size) or

should

the user have some method that lets them decide when to process a batch
based on the contents?


For now, the user passes batchSize as an argument to BatchParDo.via() it
is a number of elements. But batch based on content might be useful for

the

user. Give hint to the runner might be more flexible for the runner.

Thanks.



Another thing to think about is whether this should be connected to the
ability to run parts of the bundle in parallel.


Yes!


Maybe each batch is an RPC
and you just want to start an async RPC for each batch. Then in

addition

to
start the final RPC in finishBundle, you also need to wait for all the
RPCs
to complete.


Actually, currently each batch processing is whatever the user wants
(perBatchFn user defined function). If the user decides to issue an

async

RPC in that function (call with the arrayList of input elements), IMHO

he

is responsible for waiting for the response in that method if he 

Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Robert Bradshaw
First off, let me say that a *correctly* batching DoFn is a lot of
value, especially because it's (too) easy to (often unknowingly)
implement it incorrectly.

My take is that a BatchingParDo should be a PTransform that takes a DoFn, ? extends
Iterable> as a parameter, as well as some (optional?) batching
criteria (probably batch size and/or batch timeout). The DoFn should
map the set of inputs to a set of outputs of the same size and in the
same order as the input (or, possibly, an empty list would be
acceptable). Semantically, it should be defined as

public expand(PCollection input) {
  return input
.apply(e -> SingletonList.of(e))
.apply(parDo(batchDoFn))
.apply(es -> Iterables.onlyElement(es));
}

Getting this correct wrt timestamps and windowing is tricky. However,
even something that handles the most trivial case (e.g. GlobalWindows
only) and degenerates to batch sizes of 1 for other cases would allow
people to start using this code (rather than rolling their own) and we
could then continue to refine it.

More responses inline below.

On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot  wrote:
> Hi,
>
> I have started to implement this ticket. For now it is implemented as a
> PTransform that simply does ParDo.of(new DoFn) and all the processing
> related to batching is done in the DoFn.
>
> I'm starting to deal with windows and bundles (starting to take a look at
> the State API to process trans-bundles, more questions about this to come).
> My comments/questions are inline:
>
> Le 17/01/2017 à 18:41, Ben Chambers a écrit :
>>
>> We should start by understanding the goals. If elements are in different
>> windows can they be out in the same batch? If they have different
>> timestamps what timestamp should the batch have?
>
> Regarding timestamps: currently design is as so: the transform does not
> group elements in the PCollection, so the "batch" does not exist as an
> element in the PCollection. There is only a user defined function
> (perBatchFn) that gets called when batchSize elements have been processed.
> This function takes an ArrayList as parameter. So elements keep their
> original timestamps

Correct, elements must keep their original timestamps. This is one
reason @OnWindowExpiration is insufficient. The watermark needs to he
held back to the timestamp of the earliest element in the buffer.

> Regarding windowing: I guess that if elements are not in the same window,
> they are not expected to be in the same batch.

Batching should be possible across windows, as long as the
innerBatchDoFn does not take the Window (or window-dependent side
inputs) as parameters. Note in particular, if there is ever
non-trivial windowing, after a GBK each successive element is almost
certainly in a different window from its predecessor, which would make
emitting after each window change useless.

> I'm just starting to work on these subjects, so I might lack a bit of
> information;
> what I am currently thinking about is that I need a way to know in the DoFn
> that the window has expired so that I can call the perBatchFn even if
> batchSize is not reached.  This is the @OnWindowExpiration callback that
> Kenneth mentioned in an email about bundles.
> Lets imagine that we have a collection of elements artificially timestamped
> every 10 seconds (for simplicity of the example) and a fixed windowing of 1
> minute. Then each window contains 6 elements. If we were to buffer the
> elements by batches of 5 elements, then for each window we expect to get 2
> batches (one of 5 elements, one of 1 element). For that to append, we need a
> @OnWindowExpiration on the DoFn where we call perBatchFn
>
>> As a composite transform this will likely require a group by key which may
>> affect performance. Maybe within a dofn is better.
>
> Yes, the processing is done with a DoFn indeed.

However, without a GBK it is unclear which key state would be stored
with respect to. (On that note, one should be able to batch across
keys, which makes using the state API as is difficult.)

>> Then it could be some annotation or API that informs the runner. Should
>> batch sizes be fixed in the annotation (element count or size) or should
>> the user have some method that lets them decide when to process a batch
>> based on the contents?
>
> For now, the user passes batchSize as an argument to BatchParDo.via() it is
> a number of elements. But batch based on content might be useful for the
> user. Give hint to the runner might be more flexible for the runner. Thanks.

We should allow for runners to tune this parameter. We should also
allow for time-based batch expiration.

>> Another thing to think about is whether this should be connected to the
>> ability to run parts of the bundle in parallel.
>
> Yes!

This is, in some sense, a "sliding batch" but many of the concerns
(e.g. holding the watermark, outputting with the correct timestamps
and windows) are similar. The semantics of 

Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Eugene Kirpichov
Hi Etienne,

Could you post some snippets of how your transform is to be used in a
pipeline? I think that would make it easier to discuss on this thread and
could save a lot of churn if the discussion ends up leading to a different
API.

On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot 
wrote:

> Wonderful !
>
> Thanks Kenn !
>
> Etienne
>
>
> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :
> > Hi Etienne,
> >
> > I was drafting a proposal about @OnWindowExpiration when this email
> > arrived. I thought I would try to quickly unblock you by responding with
> a
> > TL;DR: you can achieve your goals with state & timers as they currently
> > exist. You'll set a timer for window.maxTimestamp().plus(allowedLateness)
> > precisely - when this timer fires, you are guaranteed that the input
> > watermark has exceeded this point (so all new data is droppable) while
> the
> > output timestamp is held to this point (so you can safely output into the
> > window).
> >
> > @OnWindowExpiration is (1) a convenience to save you from needing a
> handle
> > on the allowed lateness (not a problem in your case) and (2) actually
> > meaningful and potentially less expensive to implement in the absence of
> > state (this is why it needs a design discussion at all, really).
> >
> > Caveat: these APIs are new and not supported in every runner and
> windowing
> > configuration.
> >
> > Kenn
> >
> > On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot 
> > wrote:
> >
> >> Hi,
> >>
> >> I have started to implement this ticket. For now it is implemented as a
> >> PTransform that simply does ParDo.of(new DoFn) and all the processing
> >> related to batching is done in the DoFn.
> >>
> >> I'm starting to deal with windows and bundles (starting to take a look
> at
> >> the State API to process trans-bundles, more questions about this to
> come).
> >> My comments/questions are inline:
> >>
> >>
> >> Le 17/01/2017 à 18:41, Ben Chambers a écrit :
> >>
> >>> We should start by understanding the goals. If elements are in
> different
> >>> windows can they be out in the same batch? If they have different
> >>> timestamps what timestamp should the batch have?
> >>>
> >> Regarding timestamps: currently design is as so: the transform does not
> >> group elements in the PCollection, so the "batch" does not exist as an
> >> element in the PCollection. There is only a user defined function
> >> (perBatchFn) that gets called when batchSize elements have been
> processed.
> >> This function takes an ArrayList as parameter. So elements keep their
> >> original timestamps
> >>
> >>
> >> Regarding windowing: I guess that if elements are not in the same
> window,
> >> they are not expected to be in the same batch.
> >> I'm just starting to work on these subjects, so I might lack a bit of
> >> information;
> >> what I am currently thinking about is that I need a way to know in the
> >> DoFn that the window has expired so that I can call the perBatchFn even
> if
> >> batchSize is not reached.  This is the @OnWindowExpiration callback that
> >> Kenneth mentioned in an email about bundles.
> >> Lets imagine that we have a collection of elements artificially
> >> timestamped every 10 seconds (for simplicity of the example) and a fixed
> >> windowing of 1 minute. Then each window contains 6 elements. If we were
> to
> >> buffer the elements by batches of 5 elements, then for each window we
> >> expect to get 2 batches (one of 5 elements, one of 1 element). For that
> to
> >> append, we need a @OnWindowExpiration on the DoFn where we call
> perBatchFn
> >>
> >> As a composite transform this will likely require a group by key which
> may
> >>> affect performance. Maybe within a dofn is better.
> >>>
> >> Yes, the processing is done with a DoFn indeed.
> >>
> >>> Then it could be some annotation or API that informs the runner. Should
> >>> batch sizes be fixed in the annotation (element count or size) or
> should
> >>> the user have some method that lets them decide when to process a batch
> >>> based on the contents?
> >>>
> >> For now, the user passes batchSize as an argument to BatchParDo.via() it
> >> is a number of elements. But batch based on content might be useful for
> the
> >> user. Give hint to the runner might be more flexible for the runner.
> Thanks.
> >>
> >>> Another thing to think about is whether this should be connected to the
> >>> ability to run parts of the bundle in parallel.
> >>>
> >> Yes!
> >>
> >>> Maybe each batch is an RPC
> >>> and you just want to start an async RPC for each batch. Then in
> addition
> >>> to
> >>> start the final RPC in finishBundle, you also need to wait for all the
> >>> RPCs
> >>> to complete.
> >>>
> >> Actually, currently each batch processing is whatever the user wants
> >> (perBatchFn user defined function). If the user decides to issue an
> async
> >> RPC in that function (call with the arrayList of input elements), IMHO
> he
> >> is responsible for waiting for the 

Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Etienne Chauchot

Wonderful !

Thanks Kenn !

Etienne


Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :

Hi Etienne,

I was drafting a proposal about @OnWindowExpiration when this email
arrived. I thought I would try to quickly unblock you by responding with a
TL;DR: you can achieve your goals with state & timers as they currently
exist. You'll set a timer for window.maxTimestamp().plus(allowedLateness)
precisely - when this timer fires, you are guaranteed that the input
watermark has exceeded this point (so all new data is droppable) while the
output timestamp is held to this point (so you can safely output into the
window).

@OnWindowExpiration is (1) a convenience to save you from needing a handle
on the allowed lateness (not a problem in your case) and (2) actually
meaningful and potentially less expensive to implement in the absence of
state (this is why it needs a design discussion at all, really).

Caveat: these APIs are new and not supported in every runner and windowing
configuration.

Kenn

On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot 
wrote:


Hi,

I have started to implement this ticket. For now it is implemented as a
PTransform that simply does ParDo.of(new DoFn) and all the processing
related to batching is done in the DoFn.

I'm starting to deal with windows and bundles (starting to take a look at
the State API to process trans-bundles, more questions about this to come).
My comments/questions are inline:


Le 17/01/2017 à 18:41, Ben Chambers a écrit :


We should start by understanding the goals. If elements are in different
windows can they be out in the same batch? If they have different
timestamps what timestamp should the batch have?


Regarding timestamps: currently design is as so: the transform does not
group elements in the PCollection, so the "batch" does not exist as an
element in the PCollection. There is only a user defined function
(perBatchFn) that gets called when batchSize elements have been processed.
This function takes an ArrayList as parameter. So elements keep their
original timestamps


Regarding windowing: I guess that if elements are not in the same window,
they are not expected to be in the same batch.
I'm just starting to work on these subjects, so I might lack a bit of
information;
what I am currently thinking about is that I need a way to know in the
DoFn that the window has expired so that I can call the perBatchFn even if
batchSize is not reached.  This is the @OnWindowExpiration callback that
Kenneth mentioned in an email about bundles.
Lets imagine that we have a collection of elements artificially
timestamped every 10 seconds (for simplicity of the example) and a fixed
windowing of 1 minute. Then each window contains 6 elements. If we were to
buffer the elements by batches of 5 elements, then for each window we
expect to get 2 batches (one of 5 elements, one of 1 element). For that to
append, we need a @OnWindowExpiration on the DoFn where we call perBatchFn

As a composite transform this will likely require a group by key which may

affect performance. Maybe within a dofn is better.


Yes, the processing is done with a DoFn indeed.


Then it could be some annotation or API that informs the runner. Should
batch sizes be fixed in the annotation (element count or size) or should
the user have some method that lets them decide when to process a batch
based on the contents?


For now, the user passes batchSize as an argument to BatchParDo.via() it
is a number of elements. But batch based on content might be useful for the
user. Give hint to the runner might be more flexible for the runner. Thanks.


Another thing to think about is whether this should be connected to the
ability to run parts of the bundle in parallel.


Yes!


Maybe each batch is an RPC
and you just want to start an async RPC for each batch. Then in addition
to
start the final RPC in finishBundle, you also need to wait for all the
RPCs
to complete.


Actually, currently each batch processing is whatever the user wants
(perBatchFn user defined function). If the user decides to issue an async
RPC in that function (call with the arrayList of input elements), IMHO he
is responsible for waiting for the response in that method if he needs the
response, but he can also do a send and forget, depending on his use case.

Besides, I have also included a perElementFn user function to allow the
user to do some processing on the elements before adding them to the batch
(example use case: convert a String to a DTO object to invoke an external
service)

Etienne

On Tue, Jan 17, 2017, 8:48 AM Etienne Chauchot

wrote:

Hi JB,

I meant jira vote but discussion on the ML works also :)

As I understand the need (see stackoverflow links in jira ticket) the
aim is to avoid the user having to code the batching logic in his own
DoFn.processElement() and DoFn.finishBundle() regardless of the bundles.
For example, possible use case is to batch a call to an external service
(for performance).

Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Jean-Baptiste Onofré

Fantastic !

Let me take a look on the Spark runner ;)

Thanks !
Regards
JB

On 01/26/2017 03:34 PM, Kenneth Knowles wrote:

Hi Etienne,

I was drafting a proposal about @OnWindowExpiration when this email
arrived. I thought I would try to quickly unblock you by responding with a
TL;DR: you can achieve your goals with state & timers as they currently
exist. You'll set a timer for window.maxTimestamp().plus(allowedLateness)
precisely - when this timer fires, you are guaranteed that the input
watermark has exceeded this point (so all new data is droppable) while the
output timestamp is held to this point (so you can safely output into the
window).

@OnWindowExpiration is (1) a convenience to save you from needing a handle
on the allowed lateness (not a problem in your case) and (2) actually
meaningful and potentially less expensive to implement in the absence of
state (this is why it needs a design discussion at all, really).

Caveat: these APIs are new and not supported in every runner and windowing
configuration.

Kenn

On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot 
wrote:


Hi,

I have started to implement this ticket. For now it is implemented as a
PTransform that simply does ParDo.of(new DoFn) and all the processing
related to batching is done in the DoFn.

I'm starting to deal with windows and bundles (starting to take a look at
the State API to process trans-bundles, more questions about this to come).
My comments/questions are inline:


Le 17/01/2017 à 18:41, Ben Chambers a écrit :


We should start by understanding the goals. If elements are in different
windows can they be out in the same batch? If they have different
timestamps what timestamp should the batch have?



Regarding timestamps: currently design is as so: the transform does not
group elements in the PCollection, so the "batch" does not exist as an
element in the PCollection. There is only a user defined function
(perBatchFn) that gets called when batchSize elements have been processed.
This function takes an ArrayList as parameter. So elements keep their
original timestamps


Regarding windowing: I guess that if elements are not in the same window,
they are not expected to be in the same batch.
I'm just starting to work on these subjects, so I might lack a bit of
information;
what I am currently thinking about is that I need a way to know in the
DoFn that the window has expired so that I can call the perBatchFn even if
batchSize is not reached.  This is the @OnWindowExpiration callback that
Kenneth mentioned in an email about bundles.
Lets imagine that we have a collection of elements artificially
timestamped every 10 seconds (for simplicity of the example) and a fixed
windowing of 1 minute. Then each window contains 6 elements. If we were to
buffer the elements by batches of 5 elements, then for each window we
expect to get 2 batches (one of 5 elements, one of 1 element). For that to
append, we need a @OnWindowExpiration on the DoFn where we call perBatchFn

As a composite transform this will likely require a group by key which may

affect performance. Maybe within a dofn is better.


Yes, the processing is done with a DoFn indeed.


Then it could be some annotation or API that informs the runner. Should
batch sizes be fixed in the annotation (element count or size) or should
the user have some method that lets them decide when to process a batch
based on the contents?


For now, the user passes batchSize as an argument to BatchParDo.via() it
is a number of elements. But batch based on content might be useful for the
user. Give hint to the runner might be more flexible for the runner. Thanks.


Another thing to think about is whether this should be connected to the
ability to run parts of the bundle in parallel.


Yes!


Maybe each batch is an RPC
and you just want to start an async RPC for each batch. Then in addition
to
start the final RPC in finishBundle, you also need to wait for all the
RPCs
to complete.


Actually, currently each batch processing is whatever the user wants
(perBatchFn user defined function). If the user decides to issue an async
RPC in that function (call with the arrayList of input elements), IMHO he
is responsible for waiting for the response in that method if he needs the
response, but he can also do a send and forget, depending on his use case.

Besides, I have also included a perElementFn user function to allow the
user to do some processing on the elements before adding them to the batch
(example use case: convert a String to a DTO object to invoke an external
service)

Etienne

On Tue, Jan 17, 2017, 8:48 AM Etienne Chauchot

wrote:

Hi JB,

I meant jira vote but discussion on the ML works also :)

As I understand the need (see stackoverflow links in jira ticket) the
aim is to avoid the user having to code the batching logic in his own
DoFn.processElement() and DoFn.finishBundle() regardless of the bundles.
For example, possible use case is to batch a call to