Re: [BEAM-135] Utilities for "batching" elements in a DoFn
Yes there is now a new PTransform that is called GroupIntoBatches Best, Etienne Le 11/07/2017 à 02:38, Robert Bradshaw a écrit : Sorry, just saw https://github.com/apache/beam/pull/2211 On Mon, Jul 10, 2017 at 5:37 PM, Robert Bradshawwrote: Any progress on this? On Thu, Mar 9, 2017 at 1:43 AM, Etienne Chauchot wrote: Hi all, We had a discussion with Kenn yesterday about point 1 bellow, I would like to note it here on the ML: Using new method timer.set() instead of timer.setForNowPlus() makes the timer fire at the right time. Another thing, regarding point 2: if I inject the window in the @Ontimer method and print it, I see that at the moment the timer fires (at last timestamp of the window), the window is the GlobalWindow. I guess that is because the fixed window has just ended. Maybe the empty bagState that I get here is due to the end of window (passing to the GlobalWindow). I mean, as the states are scoped per window, and the window is different, then another bagState instance gets injected. Hence the empty bagState WDYT? I will open a PR even if this work is not finished yet, that way, we will have a convenient environment for discussing this code. Etienne Le 03/03/2017 à 11:48, Etienne Chauchot a écrit : Hi all, @Kenn: I have enhanced my streaming test in https://github.com/echauchot/beam/tree/BEAM-135-BATCHING-PARDO in particular to check that BatchingParDo doesn't mess up windows. It seems that it actually does :) The input collection contains 10 elements timestamped at 1s interval and it is divided into fixed windows of 5s duration (so 2 windows). startTime is epoch. The timer is used to detect the end of the window and output the content of the batch (buffer) then. I added some logs and I noticed two strange things (that might be linked): 1-The timer is set twice, and it is set correctly INFOS: * SET TIMER * Delay of 4999 ms added to timestamp 1970-01-01T00:00:00.000Z set for window [1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z) INFOS: * SET TIMER * Delay of 4999 ms added to timestamp 1970-01-01T00:00:05.000Z set for window [1970-01-01T00:00:05.000Z..1970-01-01T00:00:10.000Z) It correctly fires twice but not at the right timeStamp: INFOS: * END OF WINDOW * for timer timestamp 1970-01-01T00:00:04.999Z =>Correct INFOS: * END OF WINDOW * for timer timestamp 1970-01-01T00:00:04.999Z => Incorrect (should fire at timestamp 1970-01-01T00:00:09.999Z) Do I need to call timer.cancel() after the timer has fired ? But timer.cancel() is not supported by DirectRunner yet. 2- in @OnTimer method the injected batch bagState parameter is empty whereas it was added some elements since last batch.clear() while processing the same window INFOS: * BATCH * clear INFOS: * BATCH * Add element for window [1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z) INFOS: * BATCH * Add element for window [1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z) .. INFOS: * END OF WINDOW * for timer timestamp 1970-01-01T00:00:04.999Z INFOS: * IN ONTIMER * batch size 0 Am I doing something wrong with timers or is there something not totally finished with them (as you noticed they are quite new)? WDYT? Thanks Etienne Le 09/02/2017 à 09:55, Etienne Chauchot a écrit : Hi, @JB: good to know for the roadmap! thanks @Kenn: just to be clear: the timer fires fine. What I noticed is that it seems to be SET more than once because timer.setForNowPlus in called the @ProcessElement method. I am not 100% sure of it, what I noticed is that it started to work fine when I ensured to call timer.setForNowPlus only once. I don't say it's a bug, this is just not what I understood when I read the javadoc, I understood that it would be set only once per window, see javadoc bellow: An implementation of Timer is implicitly scoped - it may be scoped to a key and window, or a key, window, and trigger, etc. A timer exists in one of two states: set or unset. A timer can be set only for a single time per scope. I use the DirectRunner. For the key part: ok, makes sense. Thanks for your comments I'm leaving on vacation tonight for a little more than two weeks, I'll resume work then, maybe start a PR when it's ready. Etienne Le 08/02/2017 à 19:48, Kenneth Knowles a écrit : Hi Etienne, If the timer is firing n times for n elements, that's a bug in the runner / shared runner code. It should be deduped. Which runner? Can you file a JIRA against me to investigate? I'm still in the process of fleshing out more and more RunnableOnService (aka ValidatesRunner) tests so I will surely add one (existing tests already OOMed without deduping, so it wasn't at the top of my priority list) If the end user doesn't have a natural key, I would just add one and remove it within your transform. Not sure how easy this will be - you might need user intervention. Of course, you still do need to shard or you'll
Re: [BEAM-135] Utilities for "batching" elements in a DoFn
Sorry, just saw https://github.com/apache/beam/pull/2211 On Mon, Jul 10, 2017 at 5:37 PM, Robert Bradshawwrote: > Any progress on this? > > On Thu, Mar 9, 2017 at 1:43 AM, Etienne Chauchot wrote: >> Hi all, >> >> We had a discussion with Kenn yesterday about point 1 bellow, I would like >> to note it here on the ML: >> >> Using new method timer.set() instead of timer.setForNowPlus() makes the >> timer fire at the right time. >> >> Another thing, regarding point 2: if I inject the window in the @Ontimer >> method and print it, I see that at the moment the timer fires (at last >> timestamp of the window), the window is the GlobalWindow. I guess that is >> because the fixed window has just ended. Maybe the empty bagState that I get >> here is due to the end of window (passing to the GlobalWindow). I mean, as >> the states are scoped per window, and the window is different, then another >> bagState instance gets injected. Hence the empty bagState >> >> WDYT? >> >> I will open a PR even if this work is not finished yet, that way, we will >> have a convenient environment for discussing this code. >> >> Etienne >> >> >> Le 03/03/2017 à 11:48, Etienne Chauchot a écrit : >>> >>> Hi all, >>> >>> @Kenn: I have enhanced my streaming test in >>> https://github.com/echauchot/beam/tree/BEAM-135-BATCHING-PARDO in particular >>> to check that BatchingParDo doesn't mess up windows. It seems that it >>> actually does :) >>> >>> The input collection contains 10 elements timestamped at 1s interval and >>> it is divided into fixed windows of 5s duration (so 2 windows). startTime is >>> epoch. The timer is used to detect the end of the window and output the >>> content of the batch (buffer) then. >>> >>> I added some logs and I noticed two strange things (that might be linked): >>> >>> >>> 1-The timer is set twice, and it is set correctly >>> >>> INFOS: * SET TIMER * Delay of 4999 ms added to timestamp >>> 1970-01-01T00:00:00.000Z set for window >>> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z) >>> >>> INFOS: * SET TIMER * Delay of 4999 ms added to timestamp >>> 1970-01-01T00:00:05.000Z set for window >>> [1970-01-01T00:00:05.000Z..1970-01-01T00:00:10.000Z) >>> >>> It correctly fires twice but not at the right timeStamp: >>> >>> INFOS: * END OF WINDOW * for timer timestamp >>> 1970-01-01T00:00:04.999Z >>> >>> =>Correct >>> >>> INFOS: * END OF WINDOW * for timer timestamp >>> 1970-01-01T00:00:04.999Z >>> >>> => Incorrect (should fire at timestamp 1970-01-01T00:00:09.999Z) >>> >>> Do I need to call timer.cancel() after the timer has fired ? But >>> timer.cancel() is not supported by DirectRunner yet. >>> >>> >>> >>> 2- in @OnTimer method the injected batch bagState parameter is empty >>> whereas it was added some elements since last batch.clear() while processing >>> the same window >>> >>> INFOS: * BATCH * clear >>> >>> INFOS: * BATCH * Add element for window >>> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z) >>> >>> INFOS: * BATCH * Add element for window >>> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z) >>> .. >>> INFOS: * END OF WINDOW * for timer timestamp >>> 1970-01-01T00:00:04.999Z >>> INFOS: * IN ONTIMER * batch size 0 >>> >>> Am I doing something wrong with timers or is there something not totally >>> finished with them (as you noticed they are quite new)? >>> >>> WDYT? >>> >>> >>> Thanks >>> >>> Etienne >>> >>> >>> Le 09/02/2017 à 09:55, Etienne Chauchot a écrit : Hi, @JB: good to know for the roadmap! thanks @Kenn: just to be clear: the timer fires fine. What I noticed is that it seems to be SET more than once because timer.setForNowPlus in called the @ProcessElement method. I am not 100% sure of it, what I noticed is that it started to work fine when I ensured to call timer.setForNowPlus only once. I don't say it's a bug, this is just not what I understood when I read the javadoc, I understood that it would be set only once per window, see javadoc bellow: An implementation of Timer is implicitly scoped - it may be scoped to a key and window, or a key, window, and trigger, etc. A timer exists in one of two states: set or unset. A timer can be set only for a single time per scope. I use the DirectRunner. For the key part: ok, makes sense. Thanks for your comments I'm leaving on vacation tonight for a little more than two weeks, I'll resume work then, maybe start a PR when it's ready. Etienne Le 08/02/2017 à 19:48, Kenneth Knowles a écrit : > > Hi Etienne, > > If the timer is firing n times for n elements, that's a bug in the > runner / > shared runner code. It should be deduped. Which runner? Can you file a > JIRA > against me to investigate? I'm still in the process of fleshing out more
Re: [BEAM-135] Utilities for "batching" elements in a DoFn
Any progress on this? On Thu, Mar 9, 2017 at 1:43 AM, Etienne Chauchotwrote: > Hi all, > > We had a discussion with Kenn yesterday about point 1 bellow, I would like > to note it here on the ML: > > Using new method timer.set() instead of timer.setForNowPlus() makes the > timer fire at the right time. > > Another thing, regarding point 2: if I inject the window in the @Ontimer > method and print it, I see that at the moment the timer fires (at last > timestamp of the window), the window is the GlobalWindow. I guess that is > because the fixed window has just ended. Maybe the empty bagState that I get > here is due to the end of window (passing to the GlobalWindow). I mean, as > the states are scoped per window, and the window is different, then another > bagState instance gets injected. Hence the empty bagState > > WDYT? > > I will open a PR even if this work is not finished yet, that way, we will > have a convenient environment for discussing this code. > > Etienne > > > Le 03/03/2017 à 11:48, Etienne Chauchot a écrit : >> >> Hi all, >> >> @Kenn: I have enhanced my streaming test in >> https://github.com/echauchot/beam/tree/BEAM-135-BATCHING-PARDO in particular >> to check that BatchingParDo doesn't mess up windows. It seems that it >> actually does :) >> >> The input collection contains 10 elements timestamped at 1s interval and >> it is divided into fixed windows of 5s duration (so 2 windows). startTime is >> epoch. The timer is used to detect the end of the window and output the >> content of the batch (buffer) then. >> >> I added some logs and I noticed two strange things (that might be linked): >> >> >> 1-The timer is set twice, and it is set correctly >> >> INFOS: * SET TIMER * Delay of 4999 ms added to timestamp >> 1970-01-01T00:00:00.000Z set for window >> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z) >> >> INFOS: * SET TIMER * Delay of 4999 ms added to timestamp >> 1970-01-01T00:00:05.000Z set for window >> [1970-01-01T00:00:05.000Z..1970-01-01T00:00:10.000Z) >> >> It correctly fires twice but not at the right timeStamp: >> >> INFOS: * END OF WINDOW * for timer timestamp >> 1970-01-01T00:00:04.999Z >> >> =>Correct >> >> INFOS: * END OF WINDOW * for timer timestamp >> 1970-01-01T00:00:04.999Z >> >> => Incorrect (should fire at timestamp 1970-01-01T00:00:09.999Z) >> >> Do I need to call timer.cancel() after the timer has fired ? But >> timer.cancel() is not supported by DirectRunner yet. >> >> >> >> 2- in @OnTimer method the injected batch bagState parameter is empty >> whereas it was added some elements since last batch.clear() while processing >> the same window >> >> INFOS: * BATCH * clear >> >> INFOS: * BATCH * Add element for window >> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z) >> >> INFOS: * BATCH * Add element for window >> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z) >> .. >> INFOS: * END OF WINDOW * for timer timestamp >> 1970-01-01T00:00:04.999Z >> INFOS: * IN ONTIMER * batch size 0 >> >> Am I doing something wrong with timers or is there something not totally >> finished with them (as you noticed they are quite new)? >> >> WDYT? >> >> >> Thanks >> >> Etienne >> >> >> Le 09/02/2017 à 09:55, Etienne Chauchot a écrit : >>> >>> Hi, >>> >>> @JB: good to know for the roadmap! thanks >>> >>> @Kenn: just to be clear: the timer fires fine. What I noticed is that it >>> seems to be SET more than once because timer.setForNowPlus in called the >>> @ProcessElement method. I am not 100% sure of it, what I noticed is that it >>> started to work fine when I ensured to call timer.setForNowPlus only once. I >>> don't say it's a bug, this is just not what I understood when I read the >>> javadoc, I understood that it would be set only once per window, see >>> javadoc bellow: >>> >>> An implementation of Timer is implicitly scoped - it may be scoped to a >>> key and window, or a key, window, and trigger, etc. >>> A timer exists in one of two states: set or unset. A timer can be set >>> only for a single time per scope. >>> >>> I use the DirectRunner. >>> >>> For the key part: ok, makes sense. >>> >>> Thanks for your comments >>> >>> I'm leaving on vacation tonight for a little more than two weeks, I'll >>> resume work then, maybe start a PR when it's ready. >>> >>> Etienne >>> >>> >>> >>> Le 08/02/2017 à 19:48, Kenneth Knowles a écrit : Hi Etienne, If the timer is firing n times for n elements, that's a bug in the runner / shared runner code. It should be deduped. Which runner? Can you file a JIRA against me to investigate? I'm still in the process of fleshing out more and more RunnableOnService (aka ValidatesRunner) tests so I will surely add one (existing tests already OOMed without deduping, so it wasn't at the top of my priority list) If the end user doesn't have a natural key, I would just add one and
Re: [BEAM-135] Utilities for "batching" elements in a DoFn
Hi all, We had a discussion with Kenn yesterday about point 1 bellow, I would like to note it here on the ML: Using new method timer.set() instead of timer.setForNowPlus() makes the timer fire at the right time. Another thing, regarding point 2: if I inject the window in the @Ontimer method and print it, I see that at the moment the timer fires (at last timestamp of the window), the window is the GlobalWindow. I guess that is because the fixed window has just ended. Maybe the empty bagState that I get here is due to the end of window (passing to the GlobalWindow). I mean, as the states are scoped per window, and the window is different, then another bagState instance gets injected. Hence the empty bagState WDYT? I will open a PR even if this work is not finished yet, that way, we will have a convenient environment for discussing this code. Etienne Le 03/03/2017 à 11:48, Etienne Chauchot a écrit : Hi all, @Kenn: I have enhanced my streaming test in https://github.com/echauchot/beam/tree/BEAM-135-BATCHING-PARDO in particular to check that BatchingParDo doesn't mess up windows. It seems that it actually does :) The input collection contains 10 elements timestamped at 1s interval and it is divided into fixed windows of 5s duration (so 2 windows). startTime is epoch. The timer is used to detect the end of the window and output the content of the batch (buffer) then. I added some logs and I noticed two strange things (that might be linked): 1-The timer is set twice, and it is set correctly INFOS: * SET TIMER * Delay of 4999 ms added to timestamp 1970-01-01T00:00:00.000Z set for window [1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z) INFOS: * SET TIMER * Delay of 4999 ms added to timestamp 1970-01-01T00:00:05.000Z set for window [1970-01-01T00:00:05.000Z..1970-01-01T00:00:10.000Z) It correctly fires twice but not at the right timeStamp: INFOS: * END OF WINDOW * for timer timestamp 1970-01-01T00:00:04.999Z =>Correct INFOS: * END OF WINDOW * for timer timestamp 1970-01-01T00:00:04.999Z => Incorrect (should fire at timestamp 1970-01-01T00:00:09.999Z) Do I need to call timer.cancel() after the timer has fired ? But timer.cancel() is not supported by DirectRunner yet. 2- in @OnTimer method the injected batch bagState parameter is empty whereas it was added some elements since last batch.clear() while processing the same window INFOS: * BATCH * clear INFOS: * BATCH * Add element for window [1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z) INFOS: * BATCH * Add element for window [1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z) .. INFOS: * END OF WINDOW * for timer timestamp 1970-01-01T00:00:04.999Z INFOS: * IN ONTIMER * batch size 0 Am I doing something wrong with timers or is there something not totally finished with them (as you noticed they are quite new)? WDYT? Thanks Etienne Le 09/02/2017 à 09:55, Etienne Chauchot a écrit : Hi, @JB: good to know for the roadmap! thanks @Kenn: just to be clear: the timer fires fine. What I noticed is that it seems to be SET more than once because timer.setForNowPlus in called the @ProcessElement method. I am not 100% sure of it, what I noticed is that it started to work fine when I ensured to call timer.setForNowPlus only once. I don't say it's a bug, this is just not what I understood when I read the javadoc, I understood that it would be set only once per window, see javadoc bellow: An implementation of Timer is implicitly scoped - it may be scoped to a key and window, or a key, window, and trigger, etc. A timer exists in one of two states: set or unset. A timer can be set only for a single time per scope. I use the DirectRunner. For the key part: ok, makes sense. Thanks for your comments I'm leaving on vacation tonight for a little more than two weeks, I'll resume work then, maybe start a PR when it's ready. Etienne Le 08/02/2017 à 19:48, Kenneth Knowles a écrit : Hi Etienne, If the timer is firing n times for n elements, that's a bug in the runner / shared runner code. It should be deduped. Which runner? Can you file a JIRA against me to investigate? I'm still in the process of fleshing out more and more RunnableOnService (aka ValidatesRunner) tests so I will surely add one (existing tests already OOMed without deduping, so it wasn't at the top of my priority list) If the end user doesn't have a natural key, I would just add one and remove it within your transform. Not sure how easy this will be - you might need user intervention. Of course, you still do need to shard or you'll be processing the whole PCollection serially. Kenn On Wed, Feb 8, 2017 at 9:45 AM, Jean-Baptiste Onofréwrote: Hi AFAIR the timer per function is in the "roadmap" (remembering discussion we had with Kenn). I will take a deeper look next week on your branch. Regards JB On Feb
Re: [BEAM-135] Utilities for "batching" elements in a DoFn
Hi, @JB: good to know for the roadmap! thanks @Kenn: just to be clear: the timer fires fine. What I noticed is that it seems to be SET more than once because timer.setForNowPlus in called the @ProcessElement method. I am not 100% sure of it, what I noticed is that it started to work fine when I ensured to call timer.setForNowPlus only once. I don't say it's a bug, this is just not what I understood when I read the javadoc, I understood that it would be set only once per window, see javadoc bellow: An implementation of Timer is implicitly scoped - it may be scoped to a key and window, or a key, window, and trigger, etc. A timer exists in one of two states: set or unset. A timer can be set only for a single time per scope. I use the DirectRunner. For the key part: ok, makes sense. Thanks for your comments I'm leaving on vacation tonight for a little more than two weeks, I'll resume work then, maybe start a PR when it's ready. Etienne Le 08/02/2017 à 19:48, Kenneth Knowles a écrit : Hi Etienne, If the timer is firing n times for n elements, that's a bug in the runner / shared runner code. It should be deduped. Which runner? Can you file a JIRA against me to investigate? I'm still in the process of fleshing out more and more RunnableOnService (aka ValidatesRunner) tests so I will surely add one (existing tests already OOMed without deduping, so it wasn't at the top of my priority list) If the end user doesn't have a natural key, I would just add one and remove it within your transform. Not sure how easy this will be - you might need user intervention. Of course, you still do need to shard or you'll be processing the whole PCollection serially. Kenn On Wed, Feb 8, 2017 at 9:45 AM, Jean-Baptiste Onofréwrote: Hi AFAIR the timer per function is in the "roadmap" (remembering discussion we had with Kenn). I will take a deeper look next week on your branch. Regards JB On Feb 8, 2017, 13:28, at 13:28, Etienne Chauchot wrote: Hi Kenn, I have started using state and timer APIs, they seem awesome! Please take a look at https://github.com/echauchot/beam/tree/BEAM-135-BATCHING-PARDO It contains a PTransform that does the batching trans-bundles and respecting the windows (even if tests are not finished yet, see @Ignore and TODOs) I have some questions: - I use the timer to detect the end of the window like you suggested. But the timer can only be set in @ProcessElement and @Ontimer. Javadoc says that timers are implicitly scoped to a key/window and that a timer can be set only for a single time per scope. I noticed that if I call timer.setForNowPlus in the @ProcessElement method, it seems that the timer is set n times for n elements. So I just created a state with boolean to prevent setting the timer more than once per key/window. => Would it be good maybe to have a end user way of indicating that the timer will be set only once per key/window. Something analogous to @Setup, to avoid the user having to use a state boolean? - I understand that state and timers need to be per-key, but if the end user does not need a key (lets say he just needs a PCollection). Then, do we tell him to use a PCollection anyway like I wrote in the javadoc of BatchingParDo? WDYT? Thanks, Etienne Le 26/01/2017 à 17:28, Etienne Chauchot a écrit : Wonderful ! Thanks Kenn ! Etienne Le 26/01/2017 à 15:34, Kenneth Knowles a écrit : Hi Etienne, I was drafting a proposal about @OnWindowExpiration when this email arrived. I thought I would try to quickly unblock you by responding with a TL;DR: you can achieve your goals with state & timers as they currently exist. You'll set a timer for window.maxTimestamp().plus(allowedLateness) precisely - when this timer fires, you are guaranteed that the input watermark has exceeded this point (so all new data is droppable) while the output timestamp is held to this point (so you can safely output into the window). @OnWindowExpiration is (1) a convenience to save you from needing a handle on the allowed lateness (not a problem in your case) and (2) actually meaningful and potentially less expensive to implement in the absence of state (this is why it needs a design discussion at all, really). Caveat: these APIs are new and not supported in every runner and windowing configuration. Kenn On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot wrote: Hi, I have started to implement this ticket. For now it is implemented as a PTransform that simply does ParDo.of(new DoFn) and all the processing related to batching is done in the DoFn. I'm starting to deal with windows and bundles (starting to take a look at the State API to process trans-bundles, more questions about this to come). My comments/questions are inline: Le 17/01/2017 à 18:41, Ben Chambers a écrit : We should start by understanding the goals. If elements are in different windows can they be out in the same batch? If
Re: [BEAM-135] Utilities for "batching" elements in a DoFn
Hi, Le 27/01/2017 à 19:44, Robert Bradshaw a écrit : On Fri, Jan 27, 2017 at 6:55 AM, Etienne Chauchotwrote: Hi Robert, Le 26/01/2017 à 18:17, Robert Bradshaw a écrit : First off, let me say that a *correctly* batching DoFn is a lot of value, especially because it's (too) easy to (often unknowingly) implement it incorrectly. I definitely agree, I put a similar comment in another email. As an example I recall a comment of someone in stackoverflow who said that he would have forgotten to flush the batch in finishBundle. My take is that a BatchingParDo should be a PTransform that takes a DoFn, ? extends Iterable> as a parameter, as well as some (optional?) batching criteria (probably batch size and/or batch timeout). This is how I implemented it plus another perElement function that produces an intermediary type to allow the user to use another type than InputType in perBatchFn (for ex convert elements to DTO and to call external service in perBatchFn using DTOs) or to do any other per-element computation before adding elements to the batch. I think we should omit the perElement as part of this transform as that can be done immediately prior to this one without any loss of generality or utility. One can always wrap this composition in a new PTransform if desired. You're right, it is simpler to let the user do it as a pipeline step, I'll remove the perElementFn. Besides I used SimpleFunctions SimpleFunction perElementFn; SimpleFunction perBatchFn; The input ArrayList in perBatchFn is the buffer of elements. We should be as general as possible, e.g. SimpleFunction, ? extends Iterable>. Yes sure, I've updated it. Again, letting this be a DoFn rather than SimpleFunction allows for things such as setup, teardown, side inputs, etc. but forces complicated delegation so this is probably a fine start. Yes, actually, I hesitated, I have opted for the simpler as a start :) I guess, as the list of possible use cases grow, we might change to DoFn to leverage its possibilities. The DoFn should map the set of inputs to a set of outputs of the same size and in the same order as the input (or, possibly, an empty list would be acceptable). Semantically, it should be defined as public expand(PCollection input) { return input .apply(e -> SingletonList.of(e)) .apply(parDo(batchDoFn)) .apply(es -> Iterables.onlyElement(es)); } Getting this correct wrt timestamps and windowing is tricky. However, even something that handles the most trivial case (e.g. GlobalWindows only) and degenerates to batch sizes of 1 for other cases would allow people to start using this code (rather than rolling their own) and we could then continue to refine it. Yes sure, right now the code handles only the global window case. This is the very beginning, I'm still in the simple naive approach (no window and no buffering trans-bundle support), +1. We should assert on construction that the windowing is global. Even in the global window case, we'll want to avoid mangling element timestamps. I plan to use state API to buffer trans-bundle and timer API (as Kenn pointed) to detect the end of the window in the DoFn. Makes sense. It'd be nice if we could figure out a way to do this across keys (and windows, when the batch computation isn't sensitive to this of course). Thanks for your comments Robert. Glad to help. Thanks for taking this on. - Robert Thanks for your comments Etienne
Re: [BEAM-135] Utilities for "batching" elements in a DoFn
It makes sense with the PTransform, less invasive, but the user has to define to define two functions (one perElement, the other perBatch). I like the DoFn approach with annotations, and I would do the same for the batch. The "trigger/window" is a key part as well, as even if the batch is not complete, the perBatch Fn might be called. Regards JB On 01/27/2017 03:55 PM, Etienne Chauchot wrote: Hi Robert, Le 26/01/2017 à 18:17, Robert Bradshaw a écrit : First off, let me say that a *correctly* batching DoFn is a lot of value, especially because it's (too) easy to (often unknowingly) implement it incorrectly. I definitely agree, I put a similar comment in another email. As an example I recall a comment of someone in stackoverflow who said that he would have forgotten to flush the batch in finishBundle. My take is that a BatchingParDo should be a PTransformthat takes a DoFn, ? extends Iterable> as a parameter, as well as some (optional?) batching criteria (probably batch size and/or batch timeout). This is how I implemented it plus another perElement function that produces an intermediary type to allow the user to use another type than InputType in perBatchFn (for ex convert elements to DTO and to call external service in perBatchFn using DTOs) or to do any other per-element computation before adding elements to the batch. Besides I used SimpleFunctions SimpleFunction perElementFn; SimpleFunction perBatchFn; The input ArrayList in perBatchFn is the buffer of elements. The DoFn should map the set of inputs to a set of outputs of the same size and in the same order as the input (or, possibly, an empty list would be acceptable). Semantically, it should be defined as public expand(PCollection input) { return input .apply(e -> SingletonList.of(e)) .apply(parDo(batchDoFn)) .apply(es -> Iterables.onlyElement(es)); } Getting this correct wrt timestamps and windowing is tricky. However, even something that handles the most trivial case (e.g. GlobalWindows only) and degenerates to batch sizes of 1 for other cases would allow people to start using this code (rather than rolling their own) and we could then continue to refine it. Yes sure, right now the code handles only the global window case. This is the very beginning, I'm still in the simple naive approach (no window and no buffering trans-bundle support), I plan to use state API to buffer trans-bundle and timer API (as Kenn pointed) to detect the end of the window in the DoFn. Thanks for your comments Robert. More responses inline below. On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot wrote: Hi, I have started to implement this ticket. For now it is implemented as a PTransform that simply does ParDo.of(new DoFn) and all the processing related to batching is done in the DoFn. I'm starting to deal with windows and bundles (starting to take a look at the State API to process trans-bundles, more questions about this to come). My comments/questions are inline: Le 17/01/2017 à 18:41, Ben Chambers a écrit : We should start by understanding the goals. If elements are in different windows can they be out in the same batch? If they have different timestamps what timestamp should the batch have? Regarding timestamps: currently design is as so: the transform does not group elements in the PCollection, so the "batch" does not exist as an element in the PCollection. There is only a user defined function (perBatchFn) that gets called when batchSize elements have been processed. This function takes an ArrayList as parameter. So elements keep their original timestamps Correct, elements must keep their original timestamps. This is one reason @OnWindowExpiration is insufficient. The watermark needs to he held back to the timestamp of the earliest element in the buffer. Regarding windowing: I guess that if elements are not in the same window, they are not expected to be in the same batch. Batching should be possible across windows, as long as the innerBatchDoFn does not take the Window (or window-dependent side inputs) as parameters. Note in particular, if there is ever non-trivial windowing, after a GBK each successive element is almost certainly in a different window from its predecessor, which would make emitting after each window change useless. I'm just starting to work on these subjects, so I might lack a bit of information; what I am currently thinking about is that I need a way to know in the DoFn that the window has expired so that I can call the perBatchFn even if batchSize is not reached. This is the @OnWindowExpiration callback that Kenneth mentioned in an email about bundles. Lets imagine that we have a collection of elements artificially timestamped every 10 seconds (for simplicity of the example) and a fixed windowing of 1 minute. Then each window contains 6 elements. If we were to buffer the elements by batches of 5
Re: [BEAM-135] Utilities for "batching" elements in a DoFn
On Fri, Jan 27, 2017 at 6:55 AM, Etienne Chauchotwrote: > Hi Robert, > > Le 26/01/2017 à 18:17, Robert Bradshaw a écrit : >> >> First off, let me say that a *correctly* batching DoFn is a lot of >> value, especially because it's (too) easy to (often unknowingly) >> implement it incorrectly. > > I definitely agree, I put a similar comment in another email. As an example > I recall a comment of someone in stackoverflow who said that he would have > forgotten to flush the batch in finishBundle. >> >> My take is that a BatchingParDo should be a PTransform > PCollection> that takes a DoFn, ? extends >> Iterable> as a parameter, as well as some (optional?) batching >> criteria (probably batch size and/or batch timeout). > > This is how I implemented it plus another perElement function that produces > an intermediary type to allow the user to use another type than InputType in > perBatchFn (for ex convert elements to DTO and to call external service in > perBatchFn using DTOs) or to do any other per-element computation before > adding elements to the batch. I think we should omit the perElement as part of this transform as that can be done immediately prior to this one without any loss of generality or utility. One can always wrap this composition in a new PTransform if desired. > Besides I used SimpleFunctions > > SimpleFunction perElementFn; > SimpleFunction perBatchFn; > > The input ArrayList in perBatchFn is the buffer of elements. We should be as general as possible, e.g. SimpleFunction, ? extends Iterable>. Again, letting this be a DoFn rather than SimpleFunction allows for things such as setup, teardown, side inputs, etc. but forces complicated delegation so this is probably a fine start. >> The DoFn should >> map the set of inputs to a set of outputs of the same size and in the >> same order as the input (or, possibly, an empty list would be >> acceptable). Semantically, it should be defined as >> >> public expand(PCollection input) { >>return input >> .apply(e -> SingletonList.of(e)) >> .apply(parDo(batchDoFn)) >> .apply(es -> Iterables.onlyElement(es)); >> } >> >> Getting this correct wrt timestamps and windowing is tricky. However, >> even something that handles the most trivial case (e.g. GlobalWindows >> only) and degenerates to batch sizes of 1 for other cases would allow >> people to start using this code (rather than rolling their own) and we >> could then continue to refine it. > > Yes sure, right now the code handles only the global window case. This is > the very beginning, I'm still in the simple naive approach (no window and no > buffering trans-bundle support), +1. We should assert on construction that the windowing is global. Even in the global window case, we'll want to avoid mangling element timestamps. > I plan to use state API to buffer > trans-bundle and timer API (as Kenn pointed) to detect the end of the window > in the DoFn. Makes sense. It'd be nice if we could figure out a way to do this across keys (and windows, when the batch computation isn't sensitive to this of course). > Thanks for your comments Robert. Glad to help. Thanks for taking this on. - Robert
Re: [BEAM-135] Utilities for "batching" elements in a DoFn
Hi, Indeed, I did not want to be invasive in DoFn either. So I chose to implement it as a PTransform. Please be aware that it is just the very beginning, it is still in the simple naive approach (no window and no buffering trans-bundle support), I plan to use state API to buffer trans-bundle and timer API (as Kenn pointed) to detect the end of the window in the DoFn. Here is the branch, it is too early to show it but it could provide a base for discussions as Eugene said. https://github.com/echauchot/beam/commits/BEAM-135-BATCHING-PARDO take a look at BatchingParDo and BatchingParDoTest. You will find a user snipet as pseudo code in the javadoc of BatchingParDo. There is also client code in the test but it is not close to a use case, It just allows to test the inner DoFn. Thanks guys. Etienne Le 27/01/2017 à 00:00, Robert Bradshaw a écrit : On Thu, Jan 26, 2017 at 12:48 PM, Eugene Kirpichovwrote: I don't think we should make batching a core feature of the Beam programming model (by adding it to DoFn as this code snippet implies). I'm reasonably sure there are less invasive ways of implementing it. +1, either as a PTransform or a DoFn that wraps/delegates to a DoFn . On Thu, Jan 26, 2017 at 12:22 PM Jean-Baptiste Onofré wrote: Agree, I'm curious as well. I guess it would be something like: .apply(ParDo(new DoFn() { @Override public long batchSize() { return 1000; } @ProcessElement public void processElement(ProcessContext context) { ... } })); If batchSize (overrided by user) returns a positive long, then DoFn can batch with this size. Regards JB On 01/26/2017 05:38 PM, Eugene Kirpichov wrote: Hi Etienne, Could you post some snippets of how your transform is to be used in a pipeline? I think that would make it easier to discuss on this thread and could save a lot of churn if the discussion ends up leading to a different API. On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot wrote: Wonderful ! Thanks Kenn ! Etienne Le 26/01/2017 à 15:34, Kenneth Knowles a écrit : Hi Etienne, I was drafting a proposal about @OnWindowExpiration when this email arrived. I thought I would try to quickly unblock you by responding with a TL;DR: you can achieve your goals with state & timers as they currently exist. You'll set a timer for window.maxTimestamp().plus(allowedLateness) precisely - when this timer fires, you are guaranteed that the input watermark has exceeded this point (so all new data is droppable) while the output timestamp is held to this point (so you can safely output into the window). @OnWindowExpiration is (1) a convenience to save you from needing a handle on the allowed lateness (not a problem in your case) and (2) actually meaningful and potentially less expensive to implement in the absence of state (this is why it needs a design discussion at all, really). Caveat: these APIs are new and not supported in every runner and windowing configuration. Kenn On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot
Re: [BEAM-135] Utilities for "batching" elements in a DoFn
Hi Robert, Le 26/01/2017 à 18:17, Robert Bradshaw a écrit : First off, let me say that a *correctly* batching DoFn is a lot of value, especially because it's (too) easy to (often unknowingly) implement it incorrectly. I definitely agree, I put a similar comment in another email. As an example I recall a comment of someone in stackoverflow who said that he would have forgotten to flush the batch in finishBundle. My take is that a BatchingParDo should be a PTransformthat takes a DoFn, ? extends Iterable> as a parameter, as well as some (optional?) batching criteria (probably batch size and/or batch timeout). This is how I implemented it plus another perElement function that produces an intermediary type to allow the user to use another type than InputType in perBatchFn (for ex convert elements to DTO and to call external service in perBatchFn using DTOs) or to do any other per-element computation before adding elements to the batch. Besides I used SimpleFunctions SimpleFunction perElementFn; SimpleFunction perBatchFn; The input ArrayList in perBatchFn is the buffer of elements. The DoFn should map the set of inputs to a set of outputs of the same size and in the same order as the input (or, possibly, an empty list would be acceptable). Semantically, it should be defined as public expand(PCollection input) { return input .apply(e -> SingletonList.of(e)) .apply(parDo(batchDoFn)) .apply(es -> Iterables.onlyElement(es)); } Getting this correct wrt timestamps and windowing is tricky. However, even something that handles the most trivial case (e.g. GlobalWindows only) and degenerates to batch sizes of 1 for other cases would allow people to start using this code (rather than rolling their own) and we could then continue to refine it. Yes sure, right now the code handles only the global window case. This is the very beginning, I'm still in the simple naive approach (no window and no buffering trans-bundle support), I plan to use state API to buffer trans-bundle and timer API (as Kenn pointed) to detect the end of the window in the DoFn. Thanks for your comments Robert. More responses inline below. On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot wrote: Hi, I have started to implement this ticket. For now it is implemented as a PTransform that simply does ParDo.of(new DoFn) and all the processing related to batching is done in the DoFn. I'm starting to deal with windows and bundles (starting to take a look at the State API to process trans-bundles, more questions about this to come). My comments/questions are inline: Le 17/01/2017 à 18:41, Ben Chambers a écrit : We should start by understanding the goals. If elements are in different windows can they be out in the same batch? If they have different timestamps what timestamp should the batch have? Regarding timestamps: currently design is as so: the transform does not group elements in the PCollection, so the "batch" does not exist as an element in the PCollection. There is only a user defined function (perBatchFn) that gets called when batchSize elements have been processed. This function takes an ArrayList as parameter. So elements keep their original timestamps Correct, elements must keep their original timestamps. This is one reason @OnWindowExpiration is insufficient. The watermark needs to he held back to the timestamp of the earliest element in the buffer. Regarding windowing: I guess that if elements are not in the same window, they are not expected to be in the same batch. Batching should be possible across windows, as long as the innerBatchDoFn does not take the Window (or window-dependent side inputs) as parameters. Note in particular, if there is ever non-trivial windowing, after a GBK each successive element is almost certainly in a different window from its predecessor, which would make emitting after each window change useless. I'm just starting to work on these subjects, so I might lack a bit of information; what I am currently thinking about is that I need a way to know in the DoFn that the window has expired so that I can call the perBatchFn even if batchSize is not reached. This is the @OnWindowExpiration callback that Kenneth mentioned in an email about bundles. Lets imagine that we have a collection of elements artificially timestamped every 10 seconds (for simplicity of the example) and a fixed windowing of 1 minute. Then each window contains 6 elements. If we were to buffer the elements by batches of 5 elements, then for each window we expect to get 2 batches (one of 5 elements, one of 1 element). For that to append, we need a @OnWindowExpiration on the DoFn where we call perBatchFn As a composite transform this will likely require a group by key which may affect performance. Maybe within a dofn is better. Yes, the processing is done with a DoFn indeed. However, without a GBK it is unclear
Re: [BEAM-135] Utilities for "batching" elements in a DoFn
On Thu, Jan 26, 2017 at 6:58 PM, Kenneth Knowleswrote: > On Thu, Jan 26, 2017 at 4:15 PM, Robert Bradshaw < > rober...@google.com.invalid> wrote: > >> On Thu, Jan 26, 2017 at 3:42 PM, Eugene Kirpichov >> wrote: >> > >> > you can't wrap DoFn's, period >> >> As a simple example, given a DoFn it's perfectly natural to want >> to "wrap" this as a DoFn , KV >. State, side inputs, >> windows, etc. would just be passed through. > > >> The fact that this is complicated, with reflection and flexible >> signatures and byte generation, is a property of the SDK (to provide a >> flexible DoFn API). I agree that it's nice to hide this complexity >> from the user, and it discourages this kind of composability. >> > > > The difficulty of this sort of composability is a particularly bothersome > issue for DoFn. It is solvable but the solutions may seem esoteric. > > - Supporting wrapped _invocation_ is actually as easy as before if we > chose to embrace it: ArgumentProvider is roughly the same thing as ye olde > ProcessContext. We can easily provide it via our parameter mechanism, and > DoFnInvokers can be used or we can also provide a DoFnInvoker for some > requested DoFn. > > - Supporting wrapped analysis is a bit uglier but I don't think there are > technical blockers. It was actually quite bad prior to the new DoFn - you > couldn't know statically whether the wrapper class had to "implements > RequiresWindowAccess" so you would just have to do it "just in case". With > the new DoFn I could imagine a `@Wraps public DoFn getWrapped()` though > we'd still have to be able to merge this with whatever the wrapper requires > - for example if they both use state there will be nothing calling out the > fact that the wrapper needs to create a disjoint namespace. We could even > make this required if there is an ArgumentProvider parameter and/or > automatically provide a DoFnInvoker for this DoFn (so DoFnInvoker would be > in the core SDK and DoFnInvokers would be only in the SDK Fn harness). > > So I think the analysis problems are fundamental, not part of the > particulars of the new DoFn API or any particular SDK. > My point is that the DoFn as a concept in the Beam model is fundamentally, though not perfectly, compossible. Both invocation and analysis are functions of the SDK, and solvable, though perhaps not easily (and/or requiring what would normally be considered implementation details). > Coming back to the ticket... I'm going to echo Ben's early point, and now > Eugene's and Robert's that we should enumerate further use cases explicitly > and preferably add them to the JIRA. > > Both SO questions are answered with essentially an inlined PTransform Iterable> with a maximum iterable size, for batched RPCs downstream. You > can easily build timestamped and reified-windows versions without that > transform being aware of it. It is simple to understand, as easy to > implement a unified-model version via state & timers as those SO answers, > and doesn't require DoFn nesting. I think Eugene surfaced the key point, which is it depends on what one does with the output. Both of these emit "result" in the timestamp and window of whatever the last element of the batch was, regardless of the other elements. Of course they'll both break at runtime (with high probability) for inputs windowed by anything but timestamp-invariant WindowFns like GlobalWindows as the emit in finalize won't have an ambient window or timestamp to assign to its output. This is fine for writes or other terminal nodes without output (though even writes may have output). > I would love to learn more about use > cases that either debunk this or refine it. Anytime one wants to consume the output in a non-globally-windowed, non-uniform-timestamp way is broken with the SO answers above. In particular, streaming. (Adapting these to use state would restrict batches to be per-window, per-key, and still not respect timestamps.) The usecases I gave are one follows the batch computation with an "unbatch." Put another way, the batching allows amortization of processing cost for logically independent elements. One would have to do this by reifying windows and timestamps (holding back the timestamp to the earliest member of the batch), storing elements in an accumulator (per-bundle locally or cross-bundle using (per-key-window(?)) state) processing the batch, and then restoring windows and timestamp. However, the user of the API shouldn't have to manually deal with the window-reified elements, i.e. they should provide a List -> List DoFn rather than a List -> List one which is why I think nesting, or at least wrapping, is required. Any outputs other than 1:0 or 1:1 requires more manual thought as to what the window and timestamp of the output should be (or restriction of batches to single windows/timestamps, or rounding up timestamps to the end of
Re: [BEAM-135] Utilities for "batching" elements in a DoFn
Hi Eugene A simple way would be to create a BatchedDoFn in an extension. WDYT ? Regards JB On Jan 26, 2017, 21:48, at 21:48, Eugene Kirpichovwrote: >I don't think we should make batching a core feature of the Beam >programming model (by adding it to DoFn as this code snippet implies). >I'm >reasonably sure there are less invasive ways of implementing it. >On Thu, Jan 26, 2017 at 12:22 PM Jean-Baptiste Onofré >wrote: > >> Agree, I'm curious as well. >> >> I guess it would be something like: >> >> .apply(ParDo(new DoFn() { >> >> @Override >> public long batchSize() { >> return 1000; >> } >> >> @ProcessElement >> public void processElement(ProcessContext context) { >> ... >> } >> })); >> >> If batchSize (overrided by user) returns a positive long, then DoFn >can >> batch with this size. >> >> Regards >> JB >> >> On 01/26/2017 05:38 PM, Eugene Kirpichov wrote: >> > Hi Etienne, >> > >> > Could you post some snippets of how your transform is to be used in >a >> > pipeline? I think that would make it easier to discuss on this >thread and >> > could save a lot of churn if the discussion ends up leading to a >> different >> > API. >> > >> > On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot > >> > wrote: >> > >> >> Wonderful ! >> >> >> >> Thanks Kenn ! >> >> >> >> Etienne >> >> >> >> >> >> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit : >> >>> Hi Etienne, >> >>> >> >>> I was drafting a proposal about @OnWindowExpiration when this >email >> >>> arrived. I thought I would try to quickly unblock you by >responding >> with >> >> a >> >>> TL;DR: you can achieve your goals with state & timers as they >currently >> >>> exist. You'll set a timer for >> window.maxTimestamp().plus(allowedLateness) >> >>> precisely - when this timer fires, you are guaranteed that the >input >> >>> watermark has exceeded this point (so all new data is droppable) >while >> >> the >> >>> output timestamp is held to this point (so you can safely output >into >> the >> >>> window). >> >>> >> >>> @OnWindowExpiration is (1) a convenience to save you from needing >a >> >> handle >> >>> on the allowed lateness (not a problem in your case) and (2) >actually >> >>> meaningful and potentially less expensive to implement in the >absence >> of >> >>> state (this is why it needs a design discussion at all, really). >> >>> >> >>> Caveat: these APIs are new and not supported in every runner and >> >> windowing >> >>> configuration. >> >>> >> >>> Kenn >> >>> >> >>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot > > > >> >>> wrote: >> >>> >> Hi, >> >> I have started to implement this ticket. For now it is >implemented as >> a >> PTransform that simply does ParDo.of(new DoFn) and all the >processing >> related to batching is done in the DoFn. >> >> I'm starting to deal with windows and bundles (starting to take >a look >> >> at >> the State API to process trans-bundles, more questions about >this to >> >> come). >> My comments/questions are inline: >> >> >> Le 17/01/2017 à 18:41, Ben Chambers a écrit : >> >> > We should start by understanding the goals. If elements are in >> >> different >> > windows can they be out in the same batch? If they have >different >> > timestamps what timestamp should the batch have? >> > >> Regarding timestamps: currently design is as so: the transform >does >> not >> group elements in the PCollection, so the "batch" does not exist >as an >> element in the PCollection. There is only a user defined >function >> (perBatchFn) that gets called when batchSize elements have been >> >> processed. >> This function takes an ArrayList as parameter. So elements keep >their >> original timestamps >> >> >> Regarding windowing: I guess that if elements are not in the >same >> >> window, >> they are not expected to be in the same batch. >> I'm just starting to work on these subjects, so I might lack a >bit of >> information; >> what I am currently thinking about is that I need a way to know >in the >> DoFn that the window has expired so that I can call the >perBatchFn >> even >> >> if >> batchSize is not reached. This is the @OnWindowExpiration >callback >> that >> Kenneth mentioned in an email about bundles. >> Lets imagine that we have a collection of elements artificially >> timestamped every 10 seconds (for simplicity of the example) and >a >> fixed >> windowing of 1 minute. Then each window contains 6 elements. If >we >> were >> >> to >> buffer the elements by batches of 5 elements, then for each >window we >> expect to get 2 batches (one of 5 elements, one of 1 element). >For >> that >> >> to >> append, we need a @OnWindowExpiration on the DoFn where we call >> >> perBatchFn >> >> As a composite transform
Re: [BEAM-135] Utilities for "batching" elements in a DoFn
On Thu, Jan 26, 2017 at 4:15 PM, Robert Bradshaw < rober...@google.com.invalid> wrote: > On Thu, Jan 26, 2017 at 3:42 PM, Eugene Kirpichov >wrote: > > > > you can't wrap DoFn's, period > > As a simple example, given a DoFn it's perfectly natural to want > to "wrap" this as a DoFn , KV >. State, side inputs, > windows, etc. would just be passed through. > The fact that this is complicated, with reflection and flexible > signatures and byte generation, is a property of the SDK (to provide a > flexible DoFn API). I agree that it's nice to hide this complexity > from the user, and it discourages this kind of composability. > The difficulty of this sort of composability is a particularly bothersome issue for DoFn. It is solvable but the solutions may seem esoteric. - Supporting wrapped _invocation_ is actually as easy as before if we chose to embrace it: ArgumentProvider is roughly the same thing as ye olde ProcessContext. We can easily provide it via our parameter mechanism, and DoFnInvokers can be used or we can also provide a DoFnInvoker for some requested DoFn. - Supporting wrapped analysis is a bit uglier but I don't think there are technical blockers. It was actually quite bad prior to the new DoFn - you couldn't know statically whether the wrapper class had to "implements RequiresWindowAccess" so you would just have to do it "just in case". With the new DoFn I could imagine a `@Wraps public DoFn getWrapped()` though we'd still have to be able to merge this with whatever the wrapper requires - for example if they both use state there will be nothing calling out the fact that the wrapper needs to create a disjoint namespace. We could even make this required if there is an ArgumentProvider parameter and/or automatically provide a DoFnInvoker for this DoFn (so DoFnInvoker would be in the core SDK and DoFnInvokers would be only in the SDK Fn harness). So I think the analysis problems are fundamental, not part of the particulars of the new DoFn API or any particular SDK. Coming back to the ticket... I'm going to echo Ben's early point, and now Eugene's and Robert's that we should enumerate further use cases explicitly and preferably add them to the JIRA. Both SO questions are answered with essentially an inlined PTransform with a maximum iterable size, for batched RPCs downstream. You can easily build timestamped and reified-windows versions without that transform being aware of it. It is simple to understand, as easy to implement a unified-model version via state & timers as those SO answers, and doesn't require DoFn nesting. I would love to learn more about use cases that either debunk this or refine it. Also one transform does not need to serve all uses; we just need it to serve its intended use properly and try not to tempt misuse. Focusing briefly on the windowing issues, the outside world is globally windowed. So in those communications the data is in the global window whether you want it to be or not. IMO with rare expceptions rewindowing to the global window silently (such as by making an RPC ignoring the window) is data loss (or maybe just well-documented data discarding :-). So window-aware write transforms (with special case of the global window being "already ready") are a good idea from the start. It probably makes sense to have consistently windowed output from a window-aware write, so the above transform should operate per-window or else reify windows, batch in the global window, then restore windows (requires BEAM-1287 or a WindowFn). Kenn
Re: [BEAM-135] Utilities for "batching" elements in a DoFn
On Thu, Jan 26, 2017 at 4:20 PM, Ben Chamberswrote: > Here's an example API that would make this part of a DoFn. The idea here is > that it would still be run as `ParDo.of(new MyBatchedDoFn())`, but the > runner (and DoFnRunner) could see that it has asked for batches, so rather > than calling a `processElement` on every input `I`, it assembles a > `Collection` and then calls the method. > > Possible API making this part of DoFn (with a fixed size): > > public MyBatchedDoFn extends DoFn { > @ProcessBatch(size = 50) > public void processBatch(ProcessContext c) { > Collection batchContents = c.element(); > ... > } > } > > Possible API making this part of DoFn (with dynamic size): > > public MyBatchedDoFn extends DoFn { > @ProcessBatch > public boolean processBatch(ProcessContext c) { > Collection batchContents = c.element(); > if (batchContents.size() < 50) { > return false; // batch not yet processed > } > > ... > return true; > } > } Or even public MyBatchedDoFn extends DoFn { public void processElement(Iterable batch) { [process the batch] } } though I'd rather this not be baked into the DoFn API if it can be solved separately.
Re: [BEAM-135] Utilities for "batching" elements in a DoFn
The third option for batching: - Functionality within the DoFn and DoFnRunner built as part of the SDK. I haven't thought through Batching, but at least for the IntraBundleParallelization use case this actually does make sense to expose as a part of the model. Knowing that a DoFn supports parallelization, a runner may want to control how much parallelization is allowed, and the DoFn also needs to make sure to wait on all those threads (and make sure they're properly setup for logging/metrics/etc. associated with the current step). There may be good reasons to make this a property of a DoFn that the runner can inspect, and support. For instance, if a DoFn wants to process batches of 50, it may be possible to factor that into how input is split/bundled. On Thu, Jan 26, 2017 at 3:49 PM Kenneth Knowleswrote: > On Thu, Jan 26, 2017 at 3:42 PM, Eugene Kirpichov < > kirpic...@google.com.invalid> wrote: > > > The class for invoking DoFn's, > > DoFnInvokers, is absent from the SDK (and present in runners-core) for a > > good reason. > > > > This would be true if it weren't for that pesky DoFnTester :-) > > And even if we solve that problem, in the future it will be in the SDK's Fn > Harness. >
Re: [BEAM-135] Utilities for "batching" elements in a DoFn
On Thu, Jan 26, 2017 at 3:42 PM, Eugene Kirpichov < kirpic...@google.com.invalid> wrote: > The class for invoking DoFn's, > DoFnInvokers, is absent from the SDK (and present in runners-core) for a > good reason. > This would be true if it weren't for that pesky DoFnTester :-) And even if we solve that problem, in the future it will be in the SDK's Fn Harness.
Re: [BEAM-135] Utilities for "batching" elements in a DoFn
I agree that wrapping the DoFn is probably not the way to go, because the DoFn may be quite tricky due to all the reflective features: e.g. how do you automatically "batch" a DoFn that uses state and timers? What about a DoFn that uses a BoundedWindow parameter? What about a splittable DoFn? What about future reflective features? The class for invoking DoFn's, DoFnInvokers, is absent from the SDK (and present in runners-core) for a good reason. I'd rather leave the intricacies of invoking DoFn's to runners, and say that you can't wrap DoFn's, period - "adapter", "decorator" and other design patterns just don't apply to DoFn's. The two options for batching are: - A transform that takes elements and produces batches, like Robert said - A simple Beam-agnostic library that takes Java objects and produces batches of Java objects, with an API that makes it convenient to use in a typical batching DoFn On Thu, Jan 26, 2017 at 3:31 PM Ben Chamberswrote: > I think that wrapping the DoFn is tricky -- we backed out > IntraBundleParallelization because it did that, and it has weird > interactions with both the reflective DoFn and windowing. We could maybe > make some kind of "DoFnDelegatingDoFn" that could act as a base class and > get some of that right, but... > > One question I have is whether this batching should be "make batches of N > and if you need to wait for the Nth element do so" or "make batches of at > most N but don't wait too long if you don't get to N". In the former case, > we'll need to do something to buffer elements between bundles -- whether > this is using State or a GroupByKey, etc. In the latter case, the buffering > can happen entirely within a bundle -- if you get to the end of the bundle > and only have 5 elements, even if 5 < N, process that as a batch (rather > than shifting it somewhere else). > > On Thu, Jan 26, 2017 at 3:01 PM Robert Bradshaw > > wrote: > > > On Thu, Jan 26, 2017 at 12:48 PM, Eugene Kirpichov > > wrote: > > > I don't think we should make batching a core feature of the Beam > > > programming model (by adding it to DoFn as this code snippet implies). > > I'm > > > reasonably sure there are less invasive ways of implementing it. > > > > +1, either as a PTransform or a DoFn that > > wraps/delegates to a DoFn . > > > > > On Thu, Jan 26, 2017 at 12:22 PM Jean-Baptiste Onofré > > > > wrote: > > > > > >> Agree, I'm curious as well. > > >> > > >> I guess it would be something like: > > >> > > >> .apply(ParDo(new DoFn() { > > >> > > >> @Override > > >> public long batchSize() { > > >> return 1000; > > >> } > > >> > > >> @ProcessElement > > >> public void processElement(ProcessContext context) { > > >> ... > > >> } > > >> })); > > >> > > >> If batchSize (overrided by user) returns a positive long, then DoFn > can > > >> batch with this size. > > >> > > >> Regards > > >> JB > > >> > > >> On 01/26/2017 05:38 PM, Eugene Kirpichov wrote: > > >> > Hi Etienne, > > >> > > > >> > Could you post some snippets of how your transform is to be used in > a > > >> > pipeline? I think that would make it easier to discuss on this > thread > > and > > >> > could save a lot of churn if the discussion ends up leading to a > > >> different > > >> > API. > > >> > > > >> > On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot < > echauc...@gmail.com > > > > > >> > wrote: > > >> > > > >> >> Wonderful ! > > >> >> > > >> >> Thanks Kenn ! > > >> >> > > >> >> Etienne > > >> >> > > >> >> > > >> >> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit : > > >> >>> Hi Etienne, > > >> >>> > > >> >>> I was drafting a proposal about @OnWindowExpiration when this > email > > >> >>> arrived. I thought I would try to quickly unblock you by > responding > > >> with > > >> >> a > > >> >>> TL;DR: you can achieve your goals with state & timers as they > > currently > > >> >>> exist. You'll set a timer for > > >> window.maxTimestamp().plus(allowedLateness) > > >> >>> precisely - when this timer fires, you are guaranteed that the > input > > >> >>> watermark has exceeded this point (so all new data is droppable) > > while > > >> >> the > > >> >>> output timestamp is held to this point (so you can safely output > > into > > >> the > > >> >>> window). > > >> >>> > > >> >>> @OnWindowExpiration is (1) a convenience to save you from needing > a > > >> >> handle > > >> >>> on the allowed lateness (not a problem in your case) and (2) > > actually > > >> >>> meaningful and potentially less expensive to implement in the > > absence > > >> of > > >> >>> state (this is why it needs a design discussion at all, really). > > >> >>> > > >> >>> Caveat: these APIs are new and not supported in every runner and > > >> >> windowing > > >> >>> configuration. > > >> >>> > > >> >>> Kenn > > >> >>> > > >> >>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot < > >
Re: [BEAM-135] Utilities for "batching" elements in a DoFn
On Thu, Jan 26, 2017 at 3:31 PM, Ben Chamberswrote: > I think that wrapping the DoFn is tricky -- we backed out > IntraBundleParallelization because it did that, and it has weird > interactions with both the reflective DoFn and windowing. We could maybe > make some kind of "DoFnDelegatingDoFn" that could act as a base class and > get some of that right, but... Yeah, this is a lot trickier with NewDoFn. Which is unfortunate as this isn't the only case where we want to make DoFns more compossible. > One question I have is whether this batching should be "make batches of N > and if you need to wait for the Nth element do so" or "make batches of at > most N but don't wait too long if you don't get to N". In the former case, > we'll need to do something to buffer elements between bundles -- whether > this is using State or a GroupByKey, etc. In the latter case, the buffering > can happen entirely within a bundle -- if you get to the end of the bundle > and only have 5 elements, even if 5 < N, process that as a batch (rather > than shifting it somewhere else). I think the "make batches of at most N but don't wait too long if you don't get to N" is a very useful first (and tractable) start that can be built on. > On Thu, Jan 26, 2017 at 3:01 PM Robert Bradshaw > wrote: > >> On Thu, Jan 26, 2017 at 12:48 PM, Eugene Kirpichov >> wrote: >> > I don't think we should make batching a core feature of the Beam >> > programming model (by adding it to DoFn as this code snippet implies). >> I'm >> > reasonably sure there are less invasive ways of implementing it. >> >> +1, either as a PTransform or a DoFn that >> wraps/delegates to a DoFn . >> >> > On Thu, Jan 26, 2017 at 12:22 PM Jean-Baptiste Onofré >> > wrote: >> > >> >> Agree, I'm curious as well. >> >> >> >> I guess it would be something like: >> >> >> >> .apply(ParDo(new DoFn() { >> >> >> >> @Override >> >> public long batchSize() { >> >> return 1000; >> >> } >> >> >> >> @ProcessElement >> >> public void processElement(ProcessContext context) { >> >> ... >> >> } >> >> })); >> >> >> >> If batchSize (overrided by user) returns a positive long, then DoFn can >> >> batch with this size. >> >> >> >> Regards >> >> JB >> >> >> >> On 01/26/2017 05:38 PM, Eugene Kirpichov wrote: >> >> > Hi Etienne, >> >> > >> >> > Could you post some snippets of how your transform is to be used in a >> >> > pipeline? I think that would make it easier to discuss on this thread >> and >> >> > could save a lot of churn if the discussion ends up leading to a >> >> different >> >> > API. >> >> > >> >> > On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot > > >> >> > wrote: >> >> > >> >> >> Wonderful ! >> >> >> >> >> >> Thanks Kenn ! >> >> >> >> >> >> Etienne >> >> >> >> >> >> >> >> >> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit : >> >> >>> Hi Etienne, >> >> >>> >> >> >>> I was drafting a proposal about @OnWindowExpiration when this email >> >> >>> arrived. I thought I would try to quickly unblock you by responding >> >> with >> >> >> a >> >> >>> TL;DR: you can achieve your goals with state & timers as they >> currently >> >> >>> exist. You'll set a timer for >> >> window.maxTimestamp().plus(allowedLateness) >> >> >>> precisely - when this timer fires, you are guaranteed that the input >> >> >>> watermark has exceeded this point (so all new data is droppable) >> while >> >> >> the >> >> >>> output timestamp is held to this point (so you can safely output >> into >> >> the >> >> >>> window). >> >> >>> >> >> >>> @OnWindowExpiration is (1) a convenience to save you from needing a >> >> >> handle >> >> >>> on the allowed lateness (not a problem in your case) and (2) >> actually >> >> >>> meaningful and potentially less expensive to implement in the >> absence >> >> of >> >> >>> state (this is why it needs a design discussion at all, really). >> >> >>> >> >> >>> Caveat: these APIs are new and not supported in every runner and >> >> >> windowing >> >> >>> configuration. >> >> >>> >> >> >>> Kenn >> >> >>> >> >> >>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot < >> echauc...@gmail.com >> >> > >> >> >>> wrote: >> >> >>> >> >> Hi, >> >> >> >> I have started to implement this ticket. For now it is implemented >> as >> >> a >> >> PTransform that simply does ParDo.of(new DoFn) and all the >> processing >> >> related to batching is done in the DoFn. >> >> >> >> I'm starting to deal with windows and bundles (starting to take a >> look >> >> >> at >> >> the State API to process trans-bundles, more questions about this >> to >> >> >> come). >> >> My comments/questions are inline: >> >> >> >> >> >> Le 17/01/2017 à 18:41, Ben Chambers a écrit : >> >> >> >> > We should start by understanding the goals. If elements are in >> >> >> different
Re: [BEAM-135] Utilities for "batching" elements in a DoFn
I think that wrapping the DoFn is tricky -- we backed out IntraBundleParallelization because it did that, and it has weird interactions with both the reflective DoFn and windowing. We could maybe make some kind of "DoFnDelegatingDoFn" that could act as a base class and get some of that right, but... One question I have is whether this batching should be "make batches of N and if you need to wait for the Nth element do so" or "make batches of at most N but don't wait too long if you don't get to N". In the former case, we'll need to do something to buffer elements between bundles -- whether this is using State or a GroupByKey, etc. In the latter case, the buffering can happen entirely within a bundle -- if you get to the end of the bundle and only have 5 elements, even if 5 < N, process that as a batch (rather than shifting it somewhere else). On Thu, Jan 26, 2017 at 3:01 PM Robert Bradshawwrote: > On Thu, Jan 26, 2017 at 12:48 PM, Eugene Kirpichov > wrote: > > I don't think we should make batching a core feature of the Beam > > programming model (by adding it to DoFn as this code snippet implies). > I'm > > reasonably sure there are less invasive ways of implementing it. > > +1, either as a PTransform or a DoFn that > wraps/delegates to a DoFn . > > > On Thu, Jan 26, 2017 at 12:22 PM Jean-Baptiste Onofré > > wrote: > > > >> Agree, I'm curious as well. > >> > >> I guess it would be something like: > >> > >> .apply(ParDo(new DoFn() { > >> > >> @Override > >> public long batchSize() { > >> return 1000; > >> } > >> > >> @ProcessElement > >> public void processElement(ProcessContext context) { > >> ... > >> } > >> })); > >> > >> If batchSize (overrided by user) returns a positive long, then DoFn can > >> batch with this size. > >> > >> Regards > >> JB > >> > >> On 01/26/2017 05:38 PM, Eugene Kirpichov wrote: > >> > Hi Etienne, > >> > > >> > Could you post some snippets of how your transform is to be used in a > >> > pipeline? I think that would make it easier to discuss on this thread > and > >> > could save a lot of churn if the discussion ends up leading to a > >> different > >> > API. > >> > > >> > On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot > > >> > wrote: > >> > > >> >> Wonderful ! > >> >> > >> >> Thanks Kenn ! > >> >> > >> >> Etienne > >> >> > >> >> > >> >> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit : > >> >>> Hi Etienne, > >> >>> > >> >>> I was drafting a proposal about @OnWindowExpiration when this email > >> >>> arrived. I thought I would try to quickly unblock you by responding > >> with > >> >> a > >> >>> TL;DR: you can achieve your goals with state & timers as they > currently > >> >>> exist. You'll set a timer for > >> window.maxTimestamp().plus(allowedLateness) > >> >>> precisely - when this timer fires, you are guaranteed that the input > >> >>> watermark has exceeded this point (so all new data is droppable) > while > >> >> the > >> >>> output timestamp is held to this point (so you can safely output > into > >> the > >> >>> window). > >> >>> > >> >>> @OnWindowExpiration is (1) a convenience to save you from needing a > >> >> handle > >> >>> on the allowed lateness (not a problem in your case) and (2) > actually > >> >>> meaningful and potentially less expensive to implement in the > absence > >> of > >> >>> state (this is why it needs a design discussion at all, really). > >> >>> > >> >>> Caveat: these APIs are new and not supported in every runner and > >> >> windowing > >> >>> configuration. > >> >>> > >> >>> Kenn > >> >>> > >> >>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot < > echauc...@gmail.com > >> > > >> >>> wrote: > >> >>> > >> Hi, > >> > >> I have started to implement this ticket. For now it is implemented > as > >> a > >> PTransform that simply does ParDo.of(new DoFn) and all the > processing > >> related to batching is done in the DoFn. > >> > >> I'm starting to deal with windows and bundles (starting to take a > look > >> >> at > >> the State API to process trans-bundles, more questions about this > to > >> >> come). > >> My comments/questions are inline: > >> > >> > >> Le 17/01/2017 à 18:41, Ben Chambers a écrit : > >> > >> > We should start by understanding the goals. If elements are in > >> >> different > >> > windows can they be out in the same batch? If they have different > >> > timestamps what timestamp should the batch have? > >> > > >> Regarding timestamps: currently design is as so: the transform does > >> not > >> group elements in the PCollection, so the "batch" does not exist > as an > >> element in the PCollection. There is only a user defined function > >> (perBatchFn) that gets called when batchSize elements have been > >> >> processed. > >> This function takes an
Re: [BEAM-135] Utilities for "batching" elements in a DoFn
On Thu, Jan 26, 2017 at 12:48 PM, Eugene Kirpichovwrote: > I don't think we should make batching a core feature of the Beam > programming model (by adding it to DoFn as this code snippet implies). I'm > reasonably sure there are less invasive ways of implementing it. +1, either as a PTransform or a DoFn that wraps/delegates to a DoFn . > On Thu, Jan 26, 2017 at 12:22 PM Jean-Baptiste Onofré > wrote: > >> Agree, I'm curious as well. >> >> I guess it would be something like: >> >> .apply(ParDo(new DoFn() { >> >> @Override >> public long batchSize() { >> return 1000; >> } >> >> @ProcessElement >> public void processElement(ProcessContext context) { >> ... >> } >> })); >> >> If batchSize (overrided by user) returns a positive long, then DoFn can >> batch with this size. >> >> Regards >> JB >> >> On 01/26/2017 05:38 PM, Eugene Kirpichov wrote: >> > Hi Etienne, >> > >> > Could you post some snippets of how your transform is to be used in a >> > pipeline? I think that would make it easier to discuss on this thread and >> > could save a lot of churn if the discussion ends up leading to a >> different >> > API. >> > >> > On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot >> > wrote: >> > >> >> Wonderful ! >> >> >> >> Thanks Kenn ! >> >> >> >> Etienne >> >> >> >> >> >> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit : >> >>> Hi Etienne, >> >>> >> >>> I was drafting a proposal about @OnWindowExpiration when this email >> >>> arrived. I thought I would try to quickly unblock you by responding >> with >> >> a >> >>> TL;DR: you can achieve your goals with state & timers as they currently >> >>> exist. You'll set a timer for >> window.maxTimestamp().plus(allowedLateness) >> >>> precisely - when this timer fires, you are guaranteed that the input >> >>> watermark has exceeded this point (so all new data is droppable) while >> >> the >> >>> output timestamp is held to this point (so you can safely output into >> the >> >>> window). >> >>> >> >>> @OnWindowExpiration is (1) a convenience to save you from needing a >> >> handle >> >>> on the allowed lateness (not a problem in your case) and (2) actually >> >>> meaningful and potentially less expensive to implement in the absence >> of >> >>> state (this is why it needs a design discussion at all, really). >> >>> >> >>> Caveat: these APIs are new and not supported in every runner and >> >> windowing >> >>> configuration. >> >>> >> >>> Kenn >> >>> >> >>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot > > >> >>> wrote: >> >>> >> Hi, >> >> I have started to implement this ticket. For now it is implemented as >> a >> PTransform that simply does ParDo.of(new DoFn) and all the processing >> related to batching is done in the DoFn. >> >> I'm starting to deal with windows and bundles (starting to take a look >> >> at >> the State API to process trans-bundles, more questions about this to >> >> come). >> My comments/questions are inline: >> >> >> Le 17/01/2017 à 18:41, Ben Chambers a écrit : >> >> > We should start by understanding the goals. If elements are in >> >> different >> > windows can they be out in the same batch? If they have different >> > timestamps what timestamp should the batch have? >> > >> Regarding timestamps: currently design is as so: the transform does >> not >> group elements in the PCollection, so the "batch" does not exist as an >> element in the PCollection. There is only a user defined function >> (perBatchFn) that gets called when batchSize elements have been >> >> processed. >> This function takes an ArrayList as parameter. So elements keep their >> original timestamps >> >> >> Regarding windowing: I guess that if elements are not in the same >> >> window, >> they are not expected to be in the same batch. >> I'm just starting to work on these subjects, so I might lack a bit of >> information; >> what I am currently thinking about is that I need a way to know in the >> DoFn that the window has expired so that I can call the perBatchFn >> even >> >> if >> batchSize is not reached. This is the @OnWindowExpiration callback >> that >> Kenneth mentioned in an email about bundles. >> Lets imagine that we have a collection of elements artificially >> timestamped every 10 seconds (for simplicity of the example) and a >> fixed >> windowing of 1 minute. Then each window contains 6 elements. If we >> were >> >> to >> buffer the elements by batches of 5 elements, then for each window we >> expect to get 2 batches (one of 5 elements, one of 1 element). For >> that >> >> to >> append, we need a @OnWindowExpiration on the DoFn where we call >> >> perBatchFn >> >> As a composite transform this will likely
Re: [BEAM-135] Utilities for "batching" elements in a DoFn
I don't think we should make batching a core feature of the Beam programming model (by adding it to DoFn as this code snippet implies). I'm reasonably sure there are less invasive ways of implementing it. On Thu, Jan 26, 2017 at 12:22 PM Jean-Baptiste Onofréwrote: > Agree, I'm curious as well. > > I guess it would be something like: > > .apply(ParDo(new DoFn() { > > @Override > public long batchSize() { > return 1000; > } > > @ProcessElement > public void processElement(ProcessContext context) { > ... > } > })); > > If batchSize (overrided by user) returns a positive long, then DoFn can > batch with this size. > > Regards > JB > > On 01/26/2017 05:38 PM, Eugene Kirpichov wrote: > > Hi Etienne, > > > > Could you post some snippets of how your transform is to be used in a > > pipeline? I think that would make it easier to discuss on this thread and > > could save a lot of churn if the discussion ends up leading to a > different > > API. > > > > On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot > > wrote: > > > >> Wonderful ! > >> > >> Thanks Kenn ! > >> > >> Etienne > >> > >> > >> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit : > >>> Hi Etienne, > >>> > >>> I was drafting a proposal about @OnWindowExpiration when this email > >>> arrived. I thought I would try to quickly unblock you by responding > with > >> a > >>> TL;DR: you can achieve your goals with state & timers as they currently > >>> exist. You'll set a timer for > window.maxTimestamp().plus(allowedLateness) > >>> precisely - when this timer fires, you are guaranteed that the input > >>> watermark has exceeded this point (so all new data is droppable) while > >> the > >>> output timestamp is held to this point (so you can safely output into > the > >>> window). > >>> > >>> @OnWindowExpiration is (1) a convenience to save you from needing a > >> handle > >>> on the allowed lateness (not a problem in your case) and (2) actually > >>> meaningful and potentially less expensive to implement in the absence > of > >>> state (this is why it needs a design discussion at all, really). > >>> > >>> Caveat: these APIs are new and not supported in every runner and > >> windowing > >>> configuration. > >>> > >>> Kenn > >>> > >>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot > > >>> wrote: > >>> > Hi, > > I have started to implement this ticket. For now it is implemented as > a > PTransform that simply does ParDo.of(new DoFn) and all the processing > related to batching is done in the DoFn. > > I'm starting to deal with windows and bundles (starting to take a look > >> at > the State API to process trans-bundles, more questions about this to > >> come). > My comments/questions are inline: > > > Le 17/01/2017 à 18:41, Ben Chambers a écrit : > > > We should start by understanding the goals. If elements are in > >> different > > windows can they be out in the same batch? If they have different > > timestamps what timestamp should the batch have? > > > Regarding timestamps: currently design is as so: the transform does > not > group elements in the PCollection, so the "batch" does not exist as an > element in the PCollection. There is only a user defined function > (perBatchFn) that gets called when batchSize elements have been > >> processed. > This function takes an ArrayList as parameter. So elements keep their > original timestamps > > > Regarding windowing: I guess that if elements are not in the same > >> window, > they are not expected to be in the same batch. > I'm just starting to work on these subjects, so I might lack a bit of > information; > what I am currently thinking about is that I need a way to know in the > DoFn that the window has expired so that I can call the perBatchFn > even > >> if > batchSize is not reached. This is the @OnWindowExpiration callback > that > Kenneth mentioned in an email about bundles. > Lets imagine that we have a collection of elements artificially > timestamped every 10 seconds (for simplicity of the example) and a > fixed > windowing of 1 minute. Then each window contains 6 elements. If we > were > >> to > buffer the elements by batches of 5 elements, then for each window we > expect to get 2 batches (one of 5 elements, one of 1 element). For > that > >> to > append, we need a @OnWindowExpiration on the DoFn where we call > >> perBatchFn > > As a composite transform this will likely require a group by key which > >> may > > affect performance. Maybe within a dofn is better. > > > Yes, the processing is done with a DoFn indeed. > > > Then it could be some annotation or API that informs the runner. > Should > > batch sizes be fixed in the annotation (element count or size) or > >> should > >
Re: [BEAM-135] Utilities for "batching" elements in a DoFn
Agree, I'm curious as well. I guess it would be something like: .apply(ParDo(new DoFn() { @Override public long batchSize() { return 1000; } @ProcessElement public void processElement(ProcessContext context) { ... } })); If batchSize (overrided by user) returns a positive long, then DoFn can batch with this size. Regards JB On 01/26/2017 05:38 PM, Eugene Kirpichov wrote: Hi Etienne, Could you post some snippets of how your transform is to be used in a pipeline? I think that would make it easier to discuss on this thread and could save a lot of churn if the discussion ends up leading to a different API. On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchotwrote: Wonderful ! Thanks Kenn ! Etienne Le 26/01/2017 à 15:34, Kenneth Knowles a écrit : Hi Etienne, I was drafting a proposal about @OnWindowExpiration when this email arrived. I thought I would try to quickly unblock you by responding with a TL;DR: you can achieve your goals with state & timers as they currently exist. You'll set a timer for window.maxTimestamp().plus(allowedLateness) precisely - when this timer fires, you are guaranteed that the input watermark has exceeded this point (so all new data is droppable) while the output timestamp is held to this point (so you can safely output into the window). @OnWindowExpiration is (1) a convenience to save you from needing a handle on the allowed lateness (not a problem in your case) and (2) actually meaningful and potentially less expensive to implement in the absence of state (this is why it needs a design discussion at all, really). Caveat: these APIs are new and not supported in every runner and windowing configuration. Kenn On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot wrote: Hi, I have started to implement this ticket. For now it is implemented as a PTransform that simply does ParDo.of(new DoFn) and all the processing related to batching is done in the DoFn. I'm starting to deal with windows and bundles (starting to take a look at the State API to process trans-bundles, more questions about this to come). My comments/questions are inline: Le 17/01/2017 à 18:41, Ben Chambers a écrit : We should start by understanding the goals. If elements are in different windows can they be out in the same batch? If they have different timestamps what timestamp should the batch have? Regarding timestamps: currently design is as so: the transform does not group elements in the PCollection, so the "batch" does not exist as an element in the PCollection. There is only a user defined function (perBatchFn) that gets called when batchSize elements have been processed. This function takes an ArrayList as parameter. So elements keep their original timestamps Regarding windowing: I guess that if elements are not in the same window, they are not expected to be in the same batch. I'm just starting to work on these subjects, so I might lack a bit of information; what I am currently thinking about is that I need a way to know in the DoFn that the window has expired so that I can call the perBatchFn even if batchSize is not reached. This is the @OnWindowExpiration callback that Kenneth mentioned in an email about bundles. Lets imagine that we have a collection of elements artificially timestamped every 10 seconds (for simplicity of the example) and a fixed windowing of 1 minute. Then each window contains 6 elements. If we were to buffer the elements by batches of 5 elements, then for each window we expect to get 2 batches (one of 5 elements, one of 1 element). For that to append, we need a @OnWindowExpiration on the DoFn where we call perBatchFn As a composite transform this will likely require a group by key which may affect performance. Maybe within a dofn is better. Yes, the processing is done with a DoFn indeed. Then it could be some annotation or API that informs the runner. Should batch sizes be fixed in the annotation (element count or size) or should the user have some method that lets them decide when to process a batch based on the contents? For now, the user passes batchSize as an argument to BatchParDo.via() it is a number of elements. But batch based on content might be useful for the user. Give hint to the runner might be more flexible for the runner. Thanks. Another thing to think about is whether this should be connected to the ability to run parts of the bundle in parallel. Yes! Maybe each batch is an RPC and you just want to start an async RPC for each batch. Then in addition to start the final RPC in finishBundle, you also need to wait for all the RPCs to complete. Actually, currently each batch processing is whatever the user wants (perBatchFn user defined function). If the user decides to issue an async RPC in that function (call with the arrayList of input elements), IMHO he is responsible for waiting for the response in that method if he
Re: [BEAM-135] Utilities for "batching" elements in a DoFn
First off, let me say that a *correctly* batching DoFn is a lot of value, especially because it's (too) easy to (often unknowingly) implement it incorrectly. My take is that a BatchingParDo should be a PTransformthat takes a DoFn, ? extends Iterable> as a parameter, as well as some (optional?) batching criteria (probably batch size and/or batch timeout). The DoFn should map the set of inputs to a set of outputs of the same size and in the same order as the input (or, possibly, an empty list would be acceptable). Semantically, it should be defined as public expand(PCollection input) { return input .apply(e -> SingletonList.of(e)) .apply(parDo(batchDoFn)) .apply(es -> Iterables.onlyElement(es)); } Getting this correct wrt timestamps and windowing is tricky. However, even something that handles the most trivial case (e.g. GlobalWindows only) and degenerates to batch sizes of 1 for other cases would allow people to start using this code (rather than rolling their own) and we could then continue to refine it. More responses inline below. On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot wrote: > Hi, > > I have started to implement this ticket. For now it is implemented as a > PTransform that simply does ParDo.of(new DoFn) and all the processing > related to batching is done in the DoFn. > > I'm starting to deal with windows and bundles (starting to take a look at > the State API to process trans-bundles, more questions about this to come). > My comments/questions are inline: > > Le 17/01/2017 à 18:41, Ben Chambers a écrit : >> >> We should start by understanding the goals. If elements are in different >> windows can they be out in the same batch? If they have different >> timestamps what timestamp should the batch have? > > Regarding timestamps: currently design is as so: the transform does not > group elements in the PCollection, so the "batch" does not exist as an > element in the PCollection. There is only a user defined function > (perBatchFn) that gets called when batchSize elements have been processed. > This function takes an ArrayList as parameter. So elements keep their > original timestamps Correct, elements must keep their original timestamps. This is one reason @OnWindowExpiration is insufficient. The watermark needs to he held back to the timestamp of the earliest element in the buffer. > Regarding windowing: I guess that if elements are not in the same window, > they are not expected to be in the same batch. Batching should be possible across windows, as long as the innerBatchDoFn does not take the Window (or window-dependent side inputs) as parameters. Note in particular, if there is ever non-trivial windowing, after a GBK each successive element is almost certainly in a different window from its predecessor, which would make emitting after each window change useless. > I'm just starting to work on these subjects, so I might lack a bit of > information; > what I am currently thinking about is that I need a way to know in the DoFn > that the window has expired so that I can call the perBatchFn even if > batchSize is not reached. This is the @OnWindowExpiration callback that > Kenneth mentioned in an email about bundles. > Lets imagine that we have a collection of elements artificially timestamped > every 10 seconds (for simplicity of the example) and a fixed windowing of 1 > minute. Then each window contains 6 elements. If we were to buffer the > elements by batches of 5 elements, then for each window we expect to get 2 > batches (one of 5 elements, one of 1 element). For that to append, we need a > @OnWindowExpiration on the DoFn where we call perBatchFn > >> As a composite transform this will likely require a group by key which may >> affect performance. Maybe within a dofn is better. > > Yes, the processing is done with a DoFn indeed. However, without a GBK it is unclear which key state would be stored with respect to. (On that note, one should be able to batch across keys, which makes using the state API as is difficult.) >> Then it could be some annotation or API that informs the runner. Should >> batch sizes be fixed in the annotation (element count or size) or should >> the user have some method that lets them decide when to process a batch >> based on the contents? > > For now, the user passes batchSize as an argument to BatchParDo.via() it is > a number of elements. But batch based on content might be useful for the > user. Give hint to the runner might be more flexible for the runner. Thanks. We should allow for runners to tune this parameter. We should also allow for time-based batch expiration. >> Another thing to think about is whether this should be connected to the >> ability to run parts of the bundle in parallel. > > Yes! This is, in some sense, a "sliding batch" but many of the concerns (e.g. holding the watermark, outputting with the correct timestamps and windows) are similar. The semantics of
Re: [BEAM-135] Utilities for "batching" elements in a DoFn
Hi Etienne, Could you post some snippets of how your transform is to be used in a pipeline? I think that would make it easier to discuss on this thread and could save a lot of churn if the discussion ends up leading to a different API. On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchotwrote: > Wonderful ! > > Thanks Kenn ! > > Etienne > > > Le 26/01/2017 à 15:34, Kenneth Knowles a écrit : > > Hi Etienne, > > > > I was drafting a proposal about @OnWindowExpiration when this email > > arrived. I thought I would try to quickly unblock you by responding with > a > > TL;DR: you can achieve your goals with state & timers as they currently > > exist. You'll set a timer for window.maxTimestamp().plus(allowedLateness) > > precisely - when this timer fires, you are guaranteed that the input > > watermark has exceeded this point (so all new data is droppable) while > the > > output timestamp is held to this point (so you can safely output into the > > window). > > > > @OnWindowExpiration is (1) a convenience to save you from needing a > handle > > on the allowed lateness (not a problem in your case) and (2) actually > > meaningful and potentially less expensive to implement in the absence of > > state (this is why it needs a design discussion at all, really). > > > > Caveat: these APIs are new and not supported in every runner and > windowing > > configuration. > > > > Kenn > > > > On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot > > wrote: > > > >> Hi, > >> > >> I have started to implement this ticket. For now it is implemented as a > >> PTransform that simply does ParDo.of(new DoFn) and all the processing > >> related to batching is done in the DoFn. > >> > >> I'm starting to deal with windows and bundles (starting to take a look > at > >> the State API to process trans-bundles, more questions about this to > come). > >> My comments/questions are inline: > >> > >> > >> Le 17/01/2017 à 18:41, Ben Chambers a écrit : > >> > >>> We should start by understanding the goals. If elements are in > different > >>> windows can they be out in the same batch? If they have different > >>> timestamps what timestamp should the batch have? > >>> > >> Regarding timestamps: currently design is as so: the transform does not > >> group elements in the PCollection, so the "batch" does not exist as an > >> element in the PCollection. There is only a user defined function > >> (perBatchFn) that gets called when batchSize elements have been > processed. > >> This function takes an ArrayList as parameter. So elements keep their > >> original timestamps > >> > >> > >> Regarding windowing: I guess that if elements are not in the same > window, > >> they are not expected to be in the same batch. > >> I'm just starting to work on these subjects, so I might lack a bit of > >> information; > >> what I am currently thinking about is that I need a way to know in the > >> DoFn that the window has expired so that I can call the perBatchFn even > if > >> batchSize is not reached. This is the @OnWindowExpiration callback that > >> Kenneth mentioned in an email about bundles. > >> Lets imagine that we have a collection of elements artificially > >> timestamped every 10 seconds (for simplicity of the example) and a fixed > >> windowing of 1 minute. Then each window contains 6 elements. If we were > to > >> buffer the elements by batches of 5 elements, then for each window we > >> expect to get 2 batches (one of 5 elements, one of 1 element). For that > to > >> append, we need a @OnWindowExpiration on the DoFn where we call > perBatchFn > >> > >> As a composite transform this will likely require a group by key which > may > >>> affect performance. Maybe within a dofn is better. > >>> > >> Yes, the processing is done with a DoFn indeed. > >> > >>> Then it could be some annotation or API that informs the runner. Should > >>> batch sizes be fixed in the annotation (element count or size) or > should > >>> the user have some method that lets them decide when to process a batch > >>> based on the contents? > >>> > >> For now, the user passes batchSize as an argument to BatchParDo.via() it > >> is a number of elements. But batch based on content might be useful for > the > >> user. Give hint to the runner might be more flexible for the runner. > Thanks. > >> > >>> Another thing to think about is whether this should be connected to the > >>> ability to run parts of the bundle in parallel. > >>> > >> Yes! > >> > >>> Maybe each batch is an RPC > >>> and you just want to start an async RPC for each batch. Then in > addition > >>> to > >>> start the final RPC in finishBundle, you also need to wait for all the > >>> RPCs > >>> to complete. > >>> > >> Actually, currently each batch processing is whatever the user wants > >> (perBatchFn user defined function). If the user decides to issue an > async > >> RPC in that function (call with the arrayList of input elements), IMHO > he > >> is responsible for waiting for the
Re: [BEAM-135] Utilities for "batching" elements in a DoFn
Wonderful ! Thanks Kenn ! Etienne Le 26/01/2017 à 15:34, Kenneth Knowles a écrit : Hi Etienne, I was drafting a proposal about @OnWindowExpiration when this email arrived. I thought I would try to quickly unblock you by responding with a TL;DR: you can achieve your goals with state & timers as they currently exist. You'll set a timer for window.maxTimestamp().plus(allowedLateness) precisely - when this timer fires, you are guaranteed that the input watermark has exceeded this point (so all new data is droppable) while the output timestamp is held to this point (so you can safely output into the window). @OnWindowExpiration is (1) a convenience to save you from needing a handle on the allowed lateness (not a problem in your case) and (2) actually meaningful and potentially less expensive to implement in the absence of state (this is why it needs a design discussion at all, really). Caveat: these APIs are new and not supported in every runner and windowing configuration. Kenn On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchotwrote: Hi, I have started to implement this ticket. For now it is implemented as a PTransform that simply does ParDo.of(new DoFn) and all the processing related to batching is done in the DoFn. I'm starting to deal with windows and bundles (starting to take a look at the State API to process trans-bundles, more questions about this to come). My comments/questions are inline: Le 17/01/2017 à 18:41, Ben Chambers a écrit : We should start by understanding the goals. If elements are in different windows can they be out in the same batch? If they have different timestamps what timestamp should the batch have? Regarding timestamps: currently design is as so: the transform does not group elements in the PCollection, so the "batch" does not exist as an element in the PCollection. There is only a user defined function (perBatchFn) that gets called when batchSize elements have been processed. This function takes an ArrayList as parameter. So elements keep their original timestamps Regarding windowing: I guess that if elements are not in the same window, they are not expected to be in the same batch. I'm just starting to work on these subjects, so I might lack a bit of information; what I am currently thinking about is that I need a way to know in the DoFn that the window has expired so that I can call the perBatchFn even if batchSize is not reached. This is the @OnWindowExpiration callback that Kenneth mentioned in an email about bundles. Lets imagine that we have a collection of elements artificially timestamped every 10 seconds (for simplicity of the example) and a fixed windowing of 1 minute. Then each window contains 6 elements. If we were to buffer the elements by batches of 5 elements, then for each window we expect to get 2 batches (one of 5 elements, one of 1 element). For that to append, we need a @OnWindowExpiration on the DoFn where we call perBatchFn As a composite transform this will likely require a group by key which may affect performance. Maybe within a dofn is better. Yes, the processing is done with a DoFn indeed. Then it could be some annotation or API that informs the runner. Should batch sizes be fixed in the annotation (element count or size) or should the user have some method that lets them decide when to process a batch based on the contents? For now, the user passes batchSize as an argument to BatchParDo.via() it is a number of elements. But batch based on content might be useful for the user. Give hint to the runner might be more flexible for the runner. Thanks. Another thing to think about is whether this should be connected to the ability to run parts of the bundle in parallel. Yes! Maybe each batch is an RPC and you just want to start an async RPC for each batch. Then in addition to start the final RPC in finishBundle, you also need to wait for all the RPCs to complete. Actually, currently each batch processing is whatever the user wants (perBatchFn user defined function). If the user decides to issue an async RPC in that function (call with the arrayList of input elements), IMHO he is responsible for waiting for the response in that method if he needs the response, but he can also do a send and forget, depending on his use case. Besides, I have also included a perElementFn user function to allow the user to do some processing on the elements before adding them to the batch (example use case: convert a String to a DTO object to invoke an external service) Etienne On Tue, Jan 17, 2017, 8:48 AM Etienne Chauchot wrote: Hi JB, I meant jira vote but discussion on the ML works also :) As I understand the need (see stackoverflow links in jira ticket) the aim is to avoid the user having to code the batching logic in his own DoFn.processElement() and DoFn.finishBundle() regardless of the bundles. For example, possible use case is to batch a call to an external service (for performance).
Re: [BEAM-135] Utilities for "batching" elements in a DoFn
Fantastic ! Let me take a look on the Spark runner ;) Thanks ! Regards JB On 01/26/2017 03:34 PM, Kenneth Knowles wrote: Hi Etienne, I was drafting a proposal about @OnWindowExpiration when this email arrived. I thought I would try to quickly unblock you by responding with a TL;DR: you can achieve your goals with state & timers as they currently exist. You'll set a timer for window.maxTimestamp().plus(allowedLateness) precisely - when this timer fires, you are guaranteed that the input watermark has exceeded this point (so all new data is droppable) while the output timestamp is held to this point (so you can safely output into the window). @OnWindowExpiration is (1) a convenience to save you from needing a handle on the allowed lateness (not a problem in your case) and (2) actually meaningful and potentially less expensive to implement in the absence of state (this is why it needs a design discussion at all, really). Caveat: these APIs are new and not supported in every runner and windowing configuration. Kenn On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchotwrote: Hi, I have started to implement this ticket. For now it is implemented as a PTransform that simply does ParDo.of(new DoFn) and all the processing related to batching is done in the DoFn. I'm starting to deal with windows and bundles (starting to take a look at the State API to process trans-bundles, more questions about this to come). My comments/questions are inline: Le 17/01/2017 à 18:41, Ben Chambers a écrit : We should start by understanding the goals. If elements are in different windows can they be out in the same batch? If they have different timestamps what timestamp should the batch have? Regarding timestamps: currently design is as so: the transform does not group elements in the PCollection, so the "batch" does not exist as an element in the PCollection. There is only a user defined function (perBatchFn) that gets called when batchSize elements have been processed. This function takes an ArrayList as parameter. So elements keep their original timestamps Regarding windowing: I guess that if elements are not in the same window, they are not expected to be in the same batch. I'm just starting to work on these subjects, so I might lack a bit of information; what I am currently thinking about is that I need a way to know in the DoFn that the window has expired so that I can call the perBatchFn even if batchSize is not reached. This is the @OnWindowExpiration callback that Kenneth mentioned in an email about bundles. Lets imagine that we have a collection of elements artificially timestamped every 10 seconds (for simplicity of the example) and a fixed windowing of 1 minute. Then each window contains 6 elements. If we were to buffer the elements by batches of 5 elements, then for each window we expect to get 2 batches (one of 5 elements, one of 1 element). For that to append, we need a @OnWindowExpiration on the DoFn where we call perBatchFn As a composite transform this will likely require a group by key which may affect performance. Maybe within a dofn is better. Yes, the processing is done with a DoFn indeed. Then it could be some annotation or API that informs the runner. Should batch sizes be fixed in the annotation (element count or size) or should the user have some method that lets them decide when to process a batch based on the contents? For now, the user passes batchSize as an argument to BatchParDo.via() it is a number of elements. But batch based on content might be useful for the user. Give hint to the runner might be more flexible for the runner. Thanks. Another thing to think about is whether this should be connected to the ability to run parts of the bundle in parallel. Yes! Maybe each batch is an RPC and you just want to start an async RPC for each batch. Then in addition to start the final RPC in finishBundle, you also need to wait for all the RPCs to complete. Actually, currently each batch processing is whatever the user wants (perBatchFn user defined function). If the user decides to issue an async RPC in that function (call with the arrayList of input elements), IMHO he is responsible for waiting for the response in that method if he needs the response, but he can also do a send and forget, depending on his use case. Besides, I have also included a perElementFn user function to allow the user to do some processing on the elements before adding them to the batch (example use case: convert a String to a DTO object to invoke an external service) Etienne On Tue, Jan 17, 2017, 8:48 AM Etienne Chauchot wrote: Hi JB, I meant jira vote but discussion on the ML works also :) As I understand the need (see stackoverflow links in jira ticket) the aim is to avoid the user having to code the batching logic in his own DoFn.processElement() and DoFn.finishBundle() regardless of the bundles. For example, possible use case is to batch a call to