Re: Committed vs. attempted metrics results

2017-01-26 Thread Aviem Zur
Ben - yes, there is still some ambiguity regarding the querying of the
metrics results.

You've discussed in this thread the notion that metrics step names should
be converted to unique names when aggregating metrics, so that each step
will aggregate its own metrics, and not join with other steps by mistake.
The way you suggested to query this is that when querying by step, all
steps which contain the query string as a substring will show up in the
results.

This sounds fine, however this is not how the direct runner is implemented
right now. It is an exact match.

In my PR I copied the MetricsResults filtering methods directly from the
direct runner and suggested that since I copied them verbatim perhaps they
should be pulled up to a more central module, and then all runners could
use them.

So, my remaining questions are:
A) Should we create a util for filtering MetricsResults based on a query,
using the suggested filtering implementation in this thread (Substring
match) ?
B) Should direct runner be changed to use such a util, and conform with the
results filtering suggested.

On Tue, Jan 24, 2017 at 2:03 AM Ben Chambers 
wrote:

> For the short term, it seems like staying with the existing Query API and
> allowing runner's to throw exceptions if a user issues a query that is not
> supported is reasonable. It shouldn't affect the ability to run a Beam
> pipeline on other runners, since the Query API is only exposed *after* the
> pipeline is run.
>
> For the longer term, it seems like the Query API could merit some more
> thought, especially if people have good use cases for accessing the value
> of metrics programatically from the same program that ran the original
> pipeline.
>
> Aviem -- is there anything specific that needs to be discussed/nailed down
> to help with your PR?
>
> On Thu, Jan 19, 2017 at 3:57 PM Ben Chambers  wrote:
>
> > On Thu, Jan 19, 2017 at 3:28 PM Amit Sela  wrote:
> >
> > On Fri, Jan 20, 2017 at 1:18 AM Ben Chambers
>  > >
> > wrote:
> >
> > > Part of the problem here is that whether attempted or committed is
> "what
> > > matters" depends on the metric. If you're counting the number of RPCs
> to
> > an
> > > external service, you may want all the attempts (to predict your bill).
> > If
> > > you're tracking how long those RPCs took, you may want it just to be
> the
> > > committed (eg., what is the best-case time to execute your pipeline).
> > This
> > > is essentially Luke's case of wanting one or the other.
> > >
> > This sounds like Metrics should have a user-defined guarantee-level..
> which
> > might make more sense - Metrics.counter().attempted()/committed() -
> though
> > this might prove more challenging for runners to implement.
> >
> >
> > We went away from that because of cases like Luke's where a user might
> > want to compare the two. Or, not even realize there is a difference
> > up-front, so declaring ahead of time is difficult. If both are available
> > and can be looked at, if they're the same -- no problems. If they're
> > different, then it provides a good reason to investigate and figure out
> the
> > difference.
> >
> >
> > >
> > > Regarding the step names -- the metrics are reported using the full
> step
> > > name, which is also made unique during the Graph API. So "My
> > > Composite/ParDo(MyDoFn)" or "My Composite/ParDo(MyDoFn)2" if there are
> > > multiple instances within the same composite. Specifically -- the names
> > are
> > > made unique prior to recording metrics, so there are no double counts.
> > >
> > But how would the user know that ? I'm afraid this could be confusing as
> a
> > user-facing query API, and I think most users would simply name metrics
> > differently.
> >
> >
> > The query API can support querying based on a sub-string of the full
> name,
> > and return metrics from all steps that match. That would allow the user
> to
> > query metrics without knowing that. Having unique names for steps is
> > important and useful for many other things (logging, associating time
> spent
> > executing, etc.).
> >
> >
> > >
> > > On Thu, Jan 19, 2017 at 1:57 PM Amit Sela 
> wrote:
> > >
> > > > I think Luke's example is interesting, but I wonder how common it
> > > is/would
> > > > be ? I'd expect failures to happen but not in a rate that would be so
> > > > dramatic that it'd be interesting to follow applicatively (I'd expect
> > the
> > > > runner/cluster to properly monitor up time of processes/nodes
> > > separately).
> > > > And even if it is useful, I can't think of other use cases.
> > > >
> > > > I thought the idea was to "declare" the Metrics guarantee level in
> the
> > > > query API, but the more I think about it the more I tend to let it go
> > for
> > > > the following reasons:
> > > >
> > > >- Setting aside Luke's example, I think users would prefer the
> best
> > > >guarantee a runner can provide. 

Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Jean-Baptiste Onofré

Fantastic !

Let me take a look on the Spark runner ;)

Thanks !
Regards
JB

On 01/26/2017 03:34 PM, Kenneth Knowles wrote:

Hi Etienne,

I was drafting a proposal about @OnWindowExpiration when this email
arrived. I thought I would try to quickly unblock you by responding with a
TL;DR: you can achieve your goals with state & timers as they currently
exist. You'll set a timer for window.maxTimestamp().plus(allowedLateness)
precisely - when this timer fires, you are guaranteed that the input
watermark has exceeded this point (so all new data is droppable) while the
output timestamp is held to this point (so you can safely output into the
window).

@OnWindowExpiration is (1) a convenience to save you from needing a handle
on the allowed lateness (not a problem in your case) and (2) actually
meaningful and potentially less expensive to implement in the absence of
state (this is why it needs a design discussion at all, really).

Caveat: these APIs are new and not supported in every runner and windowing
configuration.

Kenn

On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot 
wrote:


Hi,

I have started to implement this ticket. For now it is implemented as a
PTransform that simply does ParDo.of(new DoFn) and all the processing
related to batching is done in the DoFn.

I'm starting to deal with windows and bundles (starting to take a look at
the State API to process trans-bundles, more questions about this to come).
My comments/questions are inline:


Le 17/01/2017 à 18:41, Ben Chambers a écrit :


We should start by understanding the goals. If elements are in different
windows can they be out in the same batch? If they have different
timestamps what timestamp should the batch have?



Regarding timestamps: currently design is as so: the transform does not
group elements in the PCollection, so the "batch" does not exist as an
element in the PCollection. There is only a user defined function
(perBatchFn) that gets called when batchSize elements have been processed.
This function takes an ArrayList as parameter. So elements keep their
original timestamps


Regarding windowing: I guess that if elements are not in the same window,
they are not expected to be in the same batch.
I'm just starting to work on these subjects, so I might lack a bit of
information;
what I am currently thinking about is that I need a way to know in the
DoFn that the window has expired so that I can call the perBatchFn even if
batchSize is not reached.  This is the @OnWindowExpiration callback that
Kenneth mentioned in an email about bundles.
Lets imagine that we have a collection of elements artificially
timestamped every 10 seconds (for simplicity of the example) and a fixed
windowing of 1 minute. Then each window contains 6 elements. If we were to
buffer the elements by batches of 5 elements, then for each window we
expect to get 2 batches (one of 5 elements, one of 1 element). For that to
append, we need a @OnWindowExpiration on the DoFn where we call perBatchFn

As a composite transform this will likely require a group by key which may

affect performance. Maybe within a dofn is better.


Yes, the processing is done with a DoFn indeed.


Then it could be some annotation or API that informs the runner. Should
batch sizes be fixed in the annotation (element count or size) or should
the user have some method that lets them decide when to process a batch
based on the contents?


For now, the user passes batchSize as an argument to BatchParDo.via() it
is a number of elements. But batch based on content might be useful for the
user. Give hint to the runner might be more flexible for the runner. Thanks.


Another thing to think about is whether this should be connected to the
ability to run parts of the bundle in parallel.


Yes!


Maybe each batch is an RPC
and you just want to start an async RPC for each batch. Then in addition
to
start the final RPC in finishBundle, you also need to wait for all the
RPCs
to complete.


Actually, currently each batch processing is whatever the user wants
(perBatchFn user defined function). If the user decides to issue an async
RPC in that function (call with the arrayList of input elements), IMHO he
is responsible for waiting for the response in that method if he needs the
response, but he can also do a send and forget, depending on his use case.

Besides, I have also included a perElementFn user function to allow the
user to do some processing on the elements before adding them to the batch
(example use case: convert a String to a DTO object to invoke an external
service)

Etienne

On Tue, Jan 17, 2017, 8:48 AM Etienne Chauchot

wrote:

Hi JB,

I meant jira vote but discussion on the ML works also :)

As I understand the need (see stackoverflow links in jira ticket) the
aim is to avoid the user having to code the batching logic in his own
DoFn.processElement() and DoFn.finishBundle() regardless of the bundles.
For example, possible use case is to batch a call to 

Re: Default Timestamp and Watermark

2017-01-26 Thread Thomas Groh
The default timestamp should be BoundedWindow.TIMESTAMP_MIN_VALUE, which is
equivalent to -2**63 microseconds. We also occasionally refer to this
timestamp as "negative infinity".

The default watermark policy for a bounded source should be negative
infinity until all of the data is read, then positive infinity. There isn't
really a default watermark policy for an unbounded source - this is
dependent on the data that hasn't been read from that source, so it's
dependent on where you're reading from.

Currently, modifying the timestamp of an element from within a DoFn does
not modify the watermark; modifying a timestamp forwards in time is
generally "safe", as it can't cause data to move to behind the watermark -
this is why moving elements backwards in time requires setting
"withAllowedTimestampSkew" (which also doesn't modify the watermark, which
means that elements that are moved backwards in time can become late and be
dropped by a runner). I don't think we currently have any changes in-flight
to make this configurable.

On Wed, Jan 25, 2017 at 9:24 PM, Shen Li  wrote:

> Hi,
>
> When reading from a source with no timestamp specified on elements, what
> should be the default timestamp? I presume that it should be 0 as I saw
> PAssertTest trying to set timestamps to very small values with 0 allowed
> timestamp skew. Is that right?
>
> What about the default watermark policy?
>
> If a ParDo modifies the timestamp using
> DoFnProcessContext.outputWithTimestamp, how should that affect the output
> watermark? Say the ParDo adds 100 seconds to the timestamp of each element
> in processElement, how could the runner know it should also add 100 seconds
> to output timestamps?
>
> Thanks,
>
> Shen
>


Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Etienne Chauchot

Wonderful !

Thanks Kenn !

Etienne


Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :

Hi Etienne,

I was drafting a proposal about @OnWindowExpiration when this email
arrived. I thought I would try to quickly unblock you by responding with a
TL;DR: you can achieve your goals with state & timers as they currently
exist. You'll set a timer for window.maxTimestamp().plus(allowedLateness)
precisely - when this timer fires, you are guaranteed that the input
watermark has exceeded this point (so all new data is droppable) while the
output timestamp is held to this point (so you can safely output into the
window).

@OnWindowExpiration is (1) a convenience to save you from needing a handle
on the allowed lateness (not a problem in your case) and (2) actually
meaningful and potentially less expensive to implement in the absence of
state (this is why it needs a design discussion at all, really).

Caveat: these APIs are new and not supported in every runner and windowing
configuration.

Kenn

On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot 
wrote:


Hi,

I have started to implement this ticket. For now it is implemented as a
PTransform that simply does ParDo.of(new DoFn) and all the processing
related to batching is done in the DoFn.

I'm starting to deal with windows and bundles (starting to take a look at
the State API to process trans-bundles, more questions about this to come).
My comments/questions are inline:


Le 17/01/2017 à 18:41, Ben Chambers a écrit :


We should start by understanding the goals. If elements are in different
windows can they be out in the same batch? If they have different
timestamps what timestamp should the batch have?


Regarding timestamps: currently design is as so: the transform does not
group elements in the PCollection, so the "batch" does not exist as an
element in the PCollection. There is only a user defined function
(perBatchFn) that gets called when batchSize elements have been processed.
This function takes an ArrayList as parameter. So elements keep their
original timestamps


Regarding windowing: I guess that if elements are not in the same window,
they are not expected to be in the same batch.
I'm just starting to work on these subjects, so I might lack a bit of
information;
what I am currently thinking about is that I need a way to know in the
DoFn that the window has expired so that I can call the perBatchFn even if
batchSize is not reached.  This is the @OnWindowExpiration callback that
Kenneth mentioned in an email about bundles.
Lets imagine that we have a collection of elements artificially
timestamped every 10 seconds (for simplicity of the example) and a fixed
windowing of 1 minute. Then each window contains 6 elements. If we were to
buffer the elements by batches of 5 elements, then for each window we
expect to get 2 batches (one of 5 elements, one of 1 element). For that to
append, we need a @OnWindowExpiration on the DoFn where we call perBatchFn

As a composite transform this will likely require a group by key which may

affect performance. Maybe within a dofn is better.


Yes, the processing is done with a DoFn indeed.


Then it could be some annotation or API that informs the runner. Should
batch sizes be fixed in the annotation (element count or size) or should
the user have some method that lets them decide when to process a batch
based on the contents?


For now, the user passes batchSize as an argument to BatchParDo.via() it
is a number of elements. But batch based on content might be useful for the
user. Give hint to the runner might be more flexible for the runner. Thanks.


Another thing to think about is whether this should be connected to the
ability to run parts of the bundle in parallel.


Yes!


Maybe each batch is an RPC
and you just want to start an async RPC for each batch. Then in addition
to
start the final RPC in finishBundle, you also need to wait for all the
RPCs
to complete.


Actually, currently each batch processing is whatever the user wants
(perBatchFn user defined function). If the user decides to issue an async
RPC in that function (call with the arrayList of input elements), IMHO he
is responsible for waiting for the response in that method if he needs the
response, but he can also do a send and forget, depending on his use case.

Besides, I have also included a perElementFn user function to allow the
user to do some processing on the elements before adding them to the batch
(example use case: convert a String to a DTO object to invoke an external
service)

Etienne

On Tue, Jan 17, 2017, 8:48 AM Etienne Chauchot

wrote:

Hi JB,

I meant jira vote but discussion on the ML works also :)

As I understand the need (see stackoverflow links in jira ticket) the
aim is to avoid the user having to code the batching logic in his own
DoFn.processElement() and DoFn.finishBundle() regardless of the bundles.
For example, possible use case is to batch a call to an external service
(for performance).

Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Eugene Kirpichov
Hi Etienne,

Could you post some snippets of how your transform is to be used in a
pipeline? I think that would make it easier to discuss on this thread and
could save a lot of churn if the discussion ends up leading to a different
API.

On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot 
wrote:

> Wonderful !
>
> Thanks Kenn !
>
> Etienne
>
>
> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :
> > Hi Etienne,
> >
> > I was drafting a proposal about @OnWindowExpiration when this email
> > arrived. I thought I would try to quickly unblock you by responding with
> a
> > TL;DR: you can achieve your goals with state & timers as they currently
> > exist. You'll set a timer for window.maxTimestamp().plus(allowedLateness)
> > precisely - when this timer fires, you are guaranteed that the input
> > watermark has exceeded this point (so all new data is droppable) while
> the
> > output timestamp is held to this point (so you can safely output into the
> > window).
> >
> > @OnWindowExpiration is (1) a convenience to save you from needing a
> handle
> > on the allowed lateness (not a problem in your case) and (2) actually
> > meaningful and potentially less expensive to implement in the absence of
> > state (this is why it needs a design discussion at all, really).
> >
> > Caveat: these APIs are new and not supported in every runner and
> windowing
> > configuration.
> >
> > Kenn
> >
> > On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot 
> > wrote:
> >
> >> Hi,
> >>
> >> I have started to implement this ticket. For now it is implemented as a
> >> PTransform that simply does ParDo.of(new DoFn) and all the processing
> >> related to batching is done in the DoFn.
> >>
> >> I'm starting to deal with windows and bundles (starting to take a look
> at
> >> the State API to process trans-bundles, more questions about this to
> come).
> >> My comments/questions are inline:
> >>
> >>
> >> Le 17/01/2017 à 18:41, Ben Chambers a écrit :
> >>
> >>> We should start by understanding the goals. If elements are in
> different
> >>> windows can they be out in the same batch? If they have different
> >>> timestamps what timestamp should the batch have?
> >>>
> >> Regarding timestamps: currently design is as so: the transform does not
> >> group elements in the PCollection, so the "batch" does not exist as an
> >> element in the PCollection. There is only a user defined function
> >> (perBatchFn) that gets called when batchSize elements have been
> processed.
> >> This function takes an ArrayList as parameter. So elements keep their
> >> original timestamps
> >>
> >>
> >> Regarding windowing: I guess that if elements are not in the same
> window,
> >> they are not expected to be in the same batch.
> >> I'm just starting to work on these subjects, so I might lack a bit of
> >> information;
> >> what I am currently thinking about is that I need a way to know in the
> >> DoFn that the window has expired so that I can call the perBatchFn even
> if
> >> batchSize is not reached.  This is the @OnWindowExpiration callback that
> >> Kenneth mentioned in an email about bundles.
> >> Lets imagine that we have a collection of elements artificially
> >> timestamped every 10 seconds (for simplicity of the example) and a fixed
> >> windowing of 1 minute. Then each window contains 6 elements. If we were
> to
> >> buffer the elements by batches of 5 elements, then for each window we
> >> expect to get 2 batches (one of 5 elements, one of 1 element). For that
> to
> >> append, we need a @OnWindowExpiration on the DoFn where we call
> perBatchFn
> >>
> >> As a composite transform this will likely require a group by key which
> may
> >>> affect performance. Maybe within a dofn is better.
> >>>
> >> Yes, the processing is done with a DoFn indeed.
> >>
> >>> Then it could be some annotation or API that informs the runner. Should
> >>> batch sizes be fixed in the annotation (element count or size) or
> should
> >>> the user have some method that lets them decide when to process a batch
> >>> based on the contents?
> >>>
> >> For now, the user passes batchSize as an argument to BatchParDo.via() it
> >> is a number of elements. But batch based on content might be useful for
> the
> >> user. Give hint to the runner might be more flexible for the runner.
> Thanks.
> >>
> >>> Another thing to think about is whether this should be connected to the
> >>> ability to run parts of the bundle in parallel.
> >>>
> >> Yes!
> >>
> >>> Maybe each batch is an RPC
> >>> and you just want to start an async RPC for each batch. Then in
> addition
> >>> to
> >>> start the final RPC in finishBundle, you also need to wait for all the
> >>> RPCs
> >>> to complete.
> >>>
> >> Actually, currently each batch processing is whatever the user wants
> >> (perBatchFn user defined function). If the user decides to issue an
> async
> >> RPC in that function (call with the arrayList of input elements), IMHO
> he
> >> is responsible for waiting for the 

Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Robert Bradshaw
First off, let me say that a *correctly* batching DoFn is a lot of
value, especially because it's (too) easy to (often unknowingly)
implement it incorrectly.

My take is that a BatchingParDo should be a PTransform that takes a DoFn, ? extends
Iterable> as a parameter, as well as some (optional?) batching
criteria (probably batch size and/or batch timeout). The DoFn should
map the set of inputs to a set of outputs of the same size and in the
same order as the input (or, possibly, an empty list would be
acceptable). Semantically, it should be defined as

public expand(PCollection input) {
  return input
.apply(e -> SingletonList.of(e))
.apply(parDo(batchDoFn))
.apply(es -> Iterables.onlyElement(es));
}

Getting this correct wrt timestamps and windowing is tricky. However,
even something that handles the most trivial case (e.g. GlobalWindows
only) and degenerates to batch sizes of 1 for other cases would allow
people to start using this code (rather than rolling their own) and we
could then continue to refine it.

More responses inline below.

On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot  wrote:
> Hi,
>
> I have started to implement this ticket. For now it is implemented as a
> PTransform that simply does ParDo.of(new DoFn) and all the processing
> related to batching is done in the DoFn.
>
> I'm starting to deal with windows and bundles (starting to take a look at
> the State API to process trans-bundles, more questions about this to come).
> My comments/questions are inline:
>
> Le 17/01/2017 à 18:41, Ben Chambers a écrit :
>>
>> We should start by understanding the goals. If elements are in different
>> windows can they be out in the same batch? If they have different
>> timestamps what timestamp should the batch have?
>
> Regarding timestamps: currently design is as so: the transform does not
> group elements in the PCollection, so the "batch" does not exist as an
> element in the PCollection. There is only a user defined function
> (perBatchFn) that gets called when batchSize elements have been processed.
> This function takes an ArrayList as parameter. So elements keep their
> original timestamps

Correct, elements must keep their original timestamps. This is one
reason @OnWindowExpiration is insufficient. The watermark needs to he
held back to the timestamp of the earliest element in the buffer.

> Regarding windowing: I guess that if elements are not in the same window,
> they are not expected to be in the same batch.

Batching should be possible across windows, as long as the
innerBatchDoFn does not take the Window (or window-dependent side
inputs) as parameters. Note in particular, if there is ever
non-trivial windowing, after a GBK each successive element is almost
certainly in a different window from its predecessor, which would make
emitting after each window change useless.

> I'm just starting to work on these subjects, so I might lack a bit of
> information;
> what I am currently thinking about is that I need a way to know in the DoFn
> that the window has expired so that I can call the perBatchFn even if
> batchSize is not reached.  This is the @OnWindowExpiration callback that
> Kenneth mentioned in an email about bundles.
> Lets imagine that we have a collection of elements artificially timestamped
> every 10 seconds (for simplicity of the example) and a fixed windowing of 1
> minute. Then each window contains 6 elements. If we were to buffer the
> elements by batches of 5 elements, then for each window we expect to get 2
> batches (one of 5 elements, one of 1 element). For that to append, we need a
> @OnWindowExpiration on the DoFn where we call perBatchFn
>
>> As a composite transform this will likely require a group by key which may
>> affect performance. Maybe within a dofn is better.
>
> Yes, the processing is done with a DoFn indeed.

However, without a GBK it is unclear which key state would be stored
with respect to. (On that note, one should be able to batch across
keys, which makes using the state API as is difficult.)

>> Then it could be some annotation or API that informs the runner. Should
>> batch sizes be fixed in the annotation (element count or size) or should
>> the user have some method that lets them decide when to process a batch
>> based on the contents?
>
> For now, the user passes batchSize as an argument to BatchParDo.via() it is
> a number of elements. But batch based on content might be useful for the
> user. Give hint to the runner might be more flexible for the runner. Thanks.

We should allow for runners to tune this parameter. We should also
allow for time-based batch expiration.

>> Another thing to think about is whether this should be connected to the
>> ability to run parts of the bundle in parallel.
>
> Yes!

This is, in some sense, a "sliding batch" but many of the concerns
(e.g. holding the watermark, outputting with the correct timestamps
and windows) are similar. The semantics of 

Re: Committed vs. attempted metrics results

2017-01-26 Thread Ben Chambers
It think relaxing the query to not be an exact match is reasonable. I'm
wondering if it should be substring or regex. either one preserves the
existing behavior of, when passed a full step path returning only the
metrics for that specific step, but it adds the ability to just know
approximately the step name.

A) Util or not seems fine to me. If it seems likely to be reusable let's do
so. Preferably in a seperate PR or commit.

B) Yes, direct runner should match whatever semantics we choose.

On Thu, Jan 26, 2017, 5:30 AM Aviem Zur  wrote:

> Ben - yes, there is still some ambiguity regarding the querying of the
> metrics results.
>
> You've discussed in this thread the notion that metrics step names should
> be converted to unique names when aggregating metrics, so that each step
> will aggregate its own metrics, and not join with other steps by mistake.
> The way you suggested to query this is that when querying by step, all
> steps which contain the query string as a substring will show up in the
> results.
>
> This sounds fine, however this is not how the direct runner is implemented
> right now. It is an exact match.
>
> In my PR I copied the MetricsResults filtering methods directly from the
> direct runner and suggested that since I copied them verbatim perhaps they
> should be pulled up to a more central module, and then all runners could
> use them.
>
> So, my remaining questions are:
> A) Should we create a util for filtering MetricsResults based on a query,
> using the suggested filtering implementation in this thread (Substring
> match) ?
> B) Should direct runner be changed to use such a util, and conform with the
> results filtering suggested.
>
> On Tue, Jan 24, 2017 at 2:03 AM Ben Chambers  >
> wrote:
>
> > For the short term, it seems like staying with the existing Query API and
> > allowing runner's to throw exceptions if a user issues a query that is
> not
> > supported is reasonable. It shouldn't affect the ability to run a Beam
> > pipeline on other runners, since the Query API is only exposed *after*
> the
> > pipeline is run.
> >
> > For the longer term, it seems like the Query API could merit some more
> > thought, especially if people have good use cases for accessing the value
> > of metrics programatically from the same program that ran the original
> > pipeline.
> >
> > Aviem -- is there anything specific that needs to be discussed/nailed
> down
> > to help with your PR?
> >
> > On Thu, Jan 19, 2017 at 3:57 PM Ben Chambers 
> wrote:
> >
> > > On Thu, Jan 19, 2017 at 3:28 PM Amit Sela 
> wrote:
> > >
> > > On Fri, Jan 20, 2017 at 1:18 AM Ben Chambers
> >  > > >
> > > wrote:
> > >
> > > > Part of the problem here is that whether attempted or committed is
> > "what
> > > > matters" depends on the metric. If you're counting the number of RPCs
> > to
> > > an
> > > > external service, you may want all the attempts (to predict your
> bill).
> > > If
> > > > you're tracking how long those RPCs took, you may want it just to be
> > the
> > > > committed (eg., what is the best-case time to execute your pipeline).
> > > This
> > > > is essentially Luke's case of wanting one or the other.
> > > >
> > > This sounds like Metrics should have a user-defined guarantee-level..
> > which
> > > might make more sense - Metrics.counter().attempted()/committed() -
> > though
> > > this might prove more challenging for runners to implement.
> > >
> > >
> > > We went away from that because of cases like Luke's where a user might
> > > want to compare the two. Or, not even realize there is a difference
> > > up-front, so declaring ahead of time is difficult. If both are
> available
> > > and can be looked at, if they're the same -- no problems. If they're
> > > different, then it provides a good reason to investigate and figure out
> > the
> > > difference.
> > >
> > >
> > > >
> > > > Regarding the step names -- the metrics are reported using the full
> > step
> > > > name, which is also made unique during the Graph API. So "My
> > > > Composite/ParDo(MyDoFn)" or "My Composite/ParDo(MyDoFn)2" if there
> are
> > > > multiple instances within the same composite. Specifically -- the
> names
> > > are
> > > > made unique prior to recording metrics, so there are no double
> counts.
> > > >
> > > But how would the user know that ? I'm afraid this could be confusing
> as
> > a
> > > user-facing query API, and I think most users would simply name metrics
> > > differently.
> > >
> > >
> > > The query API can support querying based on a sub-string of the full
> > name,
> > > and return metrics from all steps that match. That would allow the user
> > to
> > > query metrics without knowing that. Having unique names for steps is
> > > important and useful for many other things (logging, associating time
> > spent
> > > executing, etc.).
> > >
> > >
> > > >
> > > > On Thu, Jan 19, 2017 at 

Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Jean-Baptiste Onofré

Agree, I'm curious as well.

I guess it would be something like:

.apply(ParDo(new DoFn() {

   @Override
   public long batchSize() {
 return 1000;
   }

   @ProcessElement
   public void processElement(ProcessContext context) {
 ...
   }
}));

If batchSize (overrided by user) returns a positive long, then DoFn can 
batch with this size.


Regards
JB

On 01/26/2017 05:38 PM, Eugene Kirpichov wrote:

Hi Etienne,

Could you post some snippets of how your transform is to be used in a
pipeline? I think that would make it easier to discuss on this thread and
could save a lot of churn if the discussion ends up leading to a different
API.

On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot 
wrote:


Wonderful !

Thanks Kenn !

Etienne


Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :

Hi Etienne,

I was drafting a proposal about @OnWindowExpiration when this email
arrived. I thought I would try to quickly unblock you by responding with

a

TL;DR: you can achieve your goals with state & timers as they currently
exist. You'll set a timer for window.maxTimestamp().plus(allowedLateness)
precisely - when this timer fires, you are guaranteed that the input
watermark has exceeded this point (so all new data is droppable) while

the

output timestamp is held to this point (so you can safely output into the
window).

@OnWindowExpiration is (1) a convenience to save you from needing a

handle

on the allowed lateness (not a problem in your case) and (2) actually
meaningful and potentially less expensive to implement in the absence of
state (this is why it needs a design discussion at all, really).

Caveat: these APIs are new and not supported in every runner and

windowing

configuration.

Kenn

On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot 
wrote:


Hi,

I have started to implement this ticket. For now it is implemented as a
PTransform that simply does ParDo.of(new DoFn) and all the processing
related to batching is done in the DoFn.

I'm starting to deal with windows and bundles (starting to take a look

at

the State API to process trans-bundles, more questions about this to

come).

My comments/questions are inline:


Le 17/01/2017 à 18:41, Ben Chambers a écrit :


We should start by understanding the goals. If elements are in

different

windows can they be out in the same batch? If they have different
timestamps what timestamp should the batch have?


Regarding timestamps: currently design is as so: the transform does not
group elements in the PCollection, so the "batch" does not exist as an
element in the PCollection. There is only a user defined function
(perBatchFn) that gets called when batchSize elements have been

processed.

This function takes an ArrayList as parameter. So elements keep their
original timestamps


Regarding windowing: I guess that if elements are not in the same

window,

they are not expected to be in the same batch.
I'm just starting to work on these subjects, so I might lack a bit of
information;
what I am currently thinking about is that I need a way to know in the
DoFn that the window has expired so that I can call the perBatchFn even

if

batchSize is not reached.  This is the @OnWindowExpiration callback that
Kenneth mentioned in an email about bundles.
Lets imagine that we have a collection of elements artificially
timestamped every 10 seconds (for simplicity of the example) and a fixed
windowing of 1 minute. Then each window contains 6 elements. If we were

to

buffer the elements by batches of 5 elements, then for each window we
expect to get 2 batches (one of 5 elements, one of 1 element). For that

to

append, we need a @OnWindowExpiration on the DoFn where we call

perBatchFn


As a composite transform this will likely require a group by key which

may

affect performance. Maybe within a dofn is better.


Yes, the processing is done with a DoFn indeed.


Then it could be some annotation or API that informs the runner. Should
batch sizes be fixed in the annotation (element count or size) or

should

the user have some method that lets them decide when to process a batch
based on the contents?


For now, the user passes batchSize as an argument to BatchParDo.via() it
is a number of elements. But batch based on content might be useful for

the

user. Give hint to the runner might be more flexible for the runner.

Thanks.



Another thing to think about is whether this should be connected to the
ability to run parts of the bundle in parallel.


Yes!


Maybe each batch is an RPC
and you just want to start an async RPC for each batch. Then in

addition

to
start the final RPC in finishBundle, you also need to wait for all the
RPCs
to complete.


Actually, currently each batch processing is whatever the user wants
(perBatchFn user defined function). If the user decides to issue an

async

RPC in that function (call with the arrayList of input elements), IMHO

he

is responsible for waiting for the response in that method if he 

Re: Default Timestamp and Watermark

2017-01-26 Thread Kenneth Knowles
On Thu, Jan 26, 2017 at 9:48 AM, Thomas Groh 
wrote:
>
> The default watermark policy for a bounded source should be negative
> infinity until all of the data is read, then positive infinity.


Just to elaborate - there isn't a way for a bounded source to communicate a
watermark. Runners each do this internally.

Currently, modifying the timestamp of an element from within a DoFn does
> not modify the watermark; modifying a timestamp forwards in time is
> generally "safe", as it can't cause data to move to behind the watermark -
> this is why moving elements backwards in time requires setting
> "withAllowedTimestampSkew" (which also doesn't modify the watermark, which
> means that elements that are moved backwards in time can become late and be
> dropped by a runner). I don't think we currently have any changes in-flight
> to make this configurable.
>

There has been one proposal to provide adequate watermark-timestamp
interaction via a new model-level AdjustTimestamps primitive, but it needs
more discussion and implementation has not begun. The ticket also goes into
some of the same issue Thomas has described:
https://issues.apache.org/jira/browse/BEAM-644 is related.

(I am the author of the ticket, but the proposal there was developed
collaboratively so it has at least slightly more buy-in than just me :-)

Kenn


> On Wed, Jan 25, 2017 at 9:24 PM, Shen Li  wrote:
>
> > Hi,
> >
> > When reading from a source with no timestamp specified on elements, what
> > should be the default timestamp? I presume that it should be 0 as I saw
> > PAssertTest trying to set timestamps to very small values with 0 allowed
> > timestamp skew. Is that right?
> >
> > What about the default watermark policy?
> >
> > If a ParDo modifies the timestamp using
> > DoFnProcessContext.outputWithTimestamp, how should that affect the
> output
> > watermark? Say the ParDo adds 100 seconds to the timestamp of each
> element
> > in processElement, how could the runner know it should also add 100
> seconds
> > to output timestamps?
> >
> > Thanks,
> >
> > Shen
> >
>


Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Kenneth Knowles
On Thu, Jan 26, 2017 at 4:15 PM, Robert Bradshaw <
rober...@google.com.invalid> wrote:

> On Thu, Jan 26, 2017 at 3:42 PM, Eugene Kirpichov
>  wrote:
> >
> > you can't wrap DoFn's, period
>
> As a simple example, given a DoFn it's perfectly natural to want
> to "wrap" this as a DoFn, KV>. State, side inputs,
> windows, etc. would just be passed through.


> The fact that this is complicated, with reflection and flexible
> signatures and byte generation, is a property of the SDK (to provide a
> flexible DoFn API). I agree that it's nice to hide this complexity
> from the user, and it discourages this kind of composability.
>


The difficulty of this sort of composability is a particularly bothersome
issue for DoFn. It is solvable but the solutions may seem esoteric.

 - Supporting wrapped _invocation_ is actually as easy as before if we
chose to embrace it: ArgumentProvider is roughly the same thing as ye olde
ProcessContext. We can easily provide it via our parameter mechanism, and
DoFnInvokers can be used or we can also provide a DoFnInvoker for some
requested DoFn.

 - Supporting wrapped analysis is a bit uglier but I don't think there are
technical blockers. It was actually quite bad prior to the new DoFn - you
couldn't know statically whether the wrapper class had to "implements
RequiresWindowAccess" so you would just have to do it "just in case". With
the new DoFn I could imagine a `@Wraps public DoFn getWrapped()` though
we'd still have to be able to merge this with whatever the wrapper requires
- for example if they both use state there will be nothing calling out the
fact that the wrapper needs to create a disjoint namespace. We could even
make this required if there is an ArgumentProvider parameter and/or
automatically provide a DoFnInvoker for this DoFn (so DoFnInvoker would be
in the core SDK and DoFnInvokers would be only in the SDK Fn harness).

So I think the analysis problems are fundamental, not part of the
particulars of the new DoFn API or any particular SDK.


Coming back to the ticket... I'm going to echo Ben's early point, and now
Eugene's and Robert's that we should enumerate further use cases explicitly
and preferably add them to the JIRA.

Both SO questions are answered with essentially an inlined PTransform with a maximum iterable size, for batched RPCs downstream. You
can easily build timestamped and reified-windows versions without that
transform being aware of it. It is simple to understand, as easy to
implement a unified-model version via state & timers as those SO answers,
and doesn't require DoFn nesting. I would love to learn more about use
cases that either debunk this or refine it. Also one transform does not
need to serve all uses; we just need it to serve its intended use properly
and try not to tempt misuse.

Focusing briefly on the windowing issues, the outside world is globally
windowed. So in those communications the data is in the global window
whether you want it to be or not. IMO with rare expceptions rewindowing to
the global window silently (such as by making an RPC ignoring the window)
is data loss (or maybe just well-documented data discarding :-). So
window-aware write transforms (with special case of the global window being
"already ready") are a good idea from the start. It probably makes sense to
have consistently windowed output from a window-aware write, so the above
transform should operate per-window or else reify windows, batch in the
global window, then restore windows (requires BEAM-1287 or a WindowFn).

Kenn


Re: [ANNOUNCEMENT] New committers, January 2017 edition!

2017-01-26 Thread María García Herrero
Congratulations and thank you for your contributions thus far!

On Thu, Jan 26, 2017 at 6:00 PM, Robert Bradshaw <
rober...@google.com.invalid> wrote:

> Welcome and congratulations!
>
> On Thu, Jan 26, 2017 at 5:05 PM, Sourabh Bajaj
>  wrote:
> > Congrats!!
> >
> > On Thu, Jan 26, 2017 at 5:02 PM Jason Kuster  invalid>
> > wrote:
> >
> >> Congrats all! Very exciting. :)
> >>
> >> On Thu, Jan 26, 2017 at 4:48 PM, Jesse Anderson 
> >> wrote:
> >>
> >> > Welcome!
> >> >
> >> > On Thu, Jan 26, 2017, 7:27 PM Davor Bonaci  wrote:
> >> >
> >> > > Please join me and the rest of Beam PMC in welcoming the following
> >> > > contributors as our newest committers. They have significantly
> >> > contributed
> >> > > to the project in different ways, and we look forward to many more
> >> > > contributions in the future.
> >> > >
> >> > > * Stas Levin
> >> > > Stas has contributed across the breadth of the project, from the
> Spark
> >> > > runner to the core pieces and Java SDK. Looking at code
> contributions
> >> > > alone, he authored 43 commits and reported 25 issues. Stas is very
> >> active
> >> > > on the mailing lists too, contributing to good discussions and
> >> proposing
> >> > > improvements to the Beam model.
> >> > >
> >> > > * Ahmet Altay
> >> > > Ahmet is a major contributor to the Python SDK, both in terms of
> design
> >> > and
> >> > > code contribution. Looking at code contributions alone, he authored
> 98
> >> > > commits and reviewed dozens of pull requests. With Python SDK’s
> >> imminent
> >> > > merge to the master branch, Ahmet contributed towards establishing a
> >> new
> >> > > major component in Beam.
> >> > >
> >> > > * Pei He
> >> > > Pei has been contributing to Beam since its inception, accumulating
> a
> >> > total
> >> > > of 118 commits since February. He has made several major
> contributions,
> >> > > most recently by redesigning IOChannelFactory / FileSystem APIs (in
> >> > > progress), which would extend Beam’s portability to many additional
> >> file
> >> > > systems and cloud providers.
> >> > >
> >> > > Congratulations to all three! Welcome!
> >> > >
> >> > > Davor
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> ---
> >> Jason Kuster
> >> Apache Beam (Incubating) / Google Cloud Dataflow
> >>
>


Re: Conceptually, what are bundles?

2017-01-26 Thread Etienne Chauchot



Le 25/01/2017 à 20:34, Kenneth Knowles a écrit :

There's actually not a JIRA filed beyond BEAM-25 for what Eugene is
referring to. Context: Prior to windowing and streaming it was safe to
buffer elements in @ProcessElement and then actually perform output in
@FinishBundle. This pattern is only suitable for global windowing, flushing
to external systems, or requires perhaps complex manual window hackery. So
now we'll need a new callback @OnWindowExpiration that occurs
per-resident-window, before @FinishBundle, for producing output based on
remaining state before it is discarded.
+1 This is exactly what I need for BEAM-135. Lets imagine that we have a 
collection of elements artificially timestamped every 10 seconds and a 
fixed windowing of 1 minute. Then each window contains 6 elements. If we 
were to buffer the elements by batches of 5 elements, then for each 
window we expect to get 2 batches (one of 5 elements, one of 1 element). 
For that to append, we need the @OnWindowExpiration on the DoFn to 
materialize the batch of 1 element.



On Wed, Jan 25, 2017 at 11:00 AM, Eugene Kirpichov 
wrote:


One more thing.

I think ideally, bundles should not leak into the model at all - e.g.
ideally, startBundle/finishBundle methods in DoFn should not exist. They
interact poorly with windowing.
The proper way to address what is commonly done in these methods is either
Setup/Teardown methods, or a (to be designed) "window close" callback -
there's a JIRA about the latter but I couldn't find it, perhaps +Kenn
Knowles  remembers what it is.

On Wed, Jan 25, 2017 at 10:41 AM Amit Sela  wrote:


On Wed, Jan 25, 2017 at 8:23 PM Thomas Groh 
wrote:


I have a couple of points in addition to what Robert said

Runners are permitted to determine bundle sizes as appropriate to their
implementation, so long as bundles are atomically committed. The

contents

of a PCollection are independent of the bundling of that PCollection.

Runners can process all elements within their own bundles (e.g.
https://github.com/apache/beam/blob/a6810372b003adf24bdbe34ed764a6

3841af9b99/runners/flink/runner/src/main/java/org/

apache/beam/runners/flink/

translation/wrappers/streaming/DoFnOperator.java#L289
),

the entire input
data, or anywhere in between.


Or, as Thomas mentioned, a runner could process an entire

partition of the data as a bundle. It basically depends on the runner's
internal processing model.


On Wed, Jan 25, 2017 at 10:05 AM, Robert Bradshaw <
rober...@google.com.invalid> wrote:


Bundles are simply the unit of commitment (retry) in the Beam SDK.
They're not really a model concept, but do leak from the
implementation into the API as it's not feasible to checkpoint every
individual process call, and this allows some state/compute/... to be
safely amortized across elements (either the results of all processed
elements in a bundle are sent downstream, or none are and the entire
bundle is retried).

On Wed, Jan 25, 2017 at 9:36 AM, Matthew Jadczak 

wrote:

Hi,

I’m a finalist CompSci student at the University of Cambridge, and

for

my final project/dissertation I am writing an implementation of the

Beam

SDK in Elixir [1]. Given that the Beam project is obviously still very

much

WIP, it’s still somewhat difficult to find good conceptual overviews

of

parts of the system, which is crucial when translating the OOP

architecture

to something completely different. However I have found many of the

design

docs scattered around the JIRA and here very helpful. (Incidentally,
perhaps it would be helpful to maintain a list of them, to help any
contributors acquaint themselves with the conceptual vision of the
implementation?)

One thing which I have not yet been able to work out is the

significance

of “bundles” in the SDK. On the one hand, it seems that they are

simply

an

implementation detail, effectively a way to do micro-batch processing
efficiently, and indeed they are not mentioned at all in the original
Dataflow paper or anywhere in the Beam docs (except in passing). On

the

other hand, it seems most of the key transforms in the SDK core have a
concept of bundles and operate in their terms in practice, while all
conceptually being described as just operating on elements.

Do bundles have semantic meaning in the Beam Model? Are there any

guidelines as to how a given transform should split its output up into
bundles? Should any runner/SDK implementing the Model have that

concept,

even when other primitives for streaming data processing including

things

like 

Re: Conceptually, what are bundles?

2017-01-26 Thread Jean-Baptiste Onofré

It makes sense.

Agreed.

Regards
JB

On 01/25/2017 08:34 PM, Kenneth Knowles wrote:

There's actually not a JIRA filed beyond BEAM-25 for what Eugene is
referring to. Context: Prior to windowing and streaming it was safe to
buffer elements in @ProcessElement and then actually perform output in
@FinishBundle. This pattern is only suitable for global windowing, flushing
to external systems, or requires perhaps complex manual window hackery. So
now we'll need a new callback @OnWindowExpiration that occurs
per-resident-window, before @FinishBundle, for producing output based on
remaining state before it is discarded.


On Wed, Jan 25, 2017 at 11:00 AM, Eugene Kirpichov 
wrote:


One more thing.

I think ideally, bundles should not leak into the model at all - e.g.
ideally, startBundle/finishBundle methods in DoFn should not exist. They
interact poorly with windowing.
The proper way to address what is commonly done in these methods is either
Setup/Teardown methods, or a (to be designed) "window close" callback -
there's a JIRA about the latter but I couldn't find it, perhaps +Kenn
Knowles  remembers what it is.

On Wed, Jan 25, 2017 at 10:41 AM Amit Sela  wrote:


On Wed, Jan 25, 2017 at 8:23 PM Thomas Groh 
wrote:


I have a couple of points in addition to what Robert said

Runners are permitted to determine bundle sizes as appropriate to their
implementation, so long as bundles are atomically committed. The

contents

of a PCollection are independent of the bundling of that PCollection.

Runners can process all elements within their own bundles (e.g.
https://github.com/apache/beam/blob/a6810372b003adf24bdbe34ed764a6

3841af9b99/runners/flink/runner/src/main/java/org/

apache/beam/runners/flink/

translation/wrappers/streaming/DoFnOperator.java#L289
),

the entire input
data, or anywhere in between.


Or, as Thomas mentioned, a runner could process an entire

partition of the data as a bundle. It basically depends on the runner's
internal processing model.



On Wed, Jan 25, 2017 at 10:05 AM, Robert Bradshaw <
rober...@google.com.invalid> wrote:


Bundles are simply the unit of commitment (retry) in the Beam SDK.
They're not really a model concept, but do leak from the
implementation into the API as it's not feasible to checkpoint every
individual process call, and this allows some state/compute/... to be
safely amortized across elements (either the results of all processed
elements in a bundle are sent downstream, or none are and the entire
bundle is retried).

On Wed, Jan 25, 2017 at 9:36 AM, Matthew Jadczak 

wrote:

Hi,

I’m a finalist CompSci student at the University of Cambridge, and

for

my final project/dissertation I am writing an implementation of the

Beam

SDK in Elixir [1]. Given that the Beam project is obviously still very

much

WIP, it’s still somewhat difficult to find good conceptual overviews

of

parts of the system, which is crucial when translating the OOP

architecture

to something completely different. However I have found many of the

design

docs scattered around the JIRA and here very helpful. (Incidentally,
perhaps it would be helpful to maintain a list of them, to help any
contributors acquaint themselves with the conceptual vision of the
implementation?)


One thing which I have not yet been able to work out is the

significance

of “bundles” in the SDK. On the one hand, it seems that they are

simply

an

implementation detail, effectively a way to do micro-batch processing
efficiently, and indeed they are not mentioned at all in the original
Dataflow paper or anywhere in the Beam docs (except in passing). On

the

other hand, it seems most of the key transforms in the SDK core have a
concept of bundles and operate in their terms in practice, while all
conceptually being described as just operating on elements.


Do bundles have semantic meaning in the Beam Model? Are there any

guidelines as to how a given transform should split its output up into
bundles? Should any runner/SDK implementing the Model have that

concept,

even when other primitives for streaming data processing including

things

like efficiently transmitting individual elements between stages with
backpressure are available in the language/standard libraries? Are

there

any insights here that I am missing, i.e. were problems present in

early

versions of the runners solved by adding the concept of bundles?


Thanks so much,
Matt

[1] http://elixir-lang.org/












--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - 

Re: Better developer instructions for using Maven?

2017-01-26 Thread Aljoscha Krettek
+1 to what Dan said

On Wed, 25 Jan 2017 at 21:40 Kenneth Knowles  wrote:

> +1
>
> On Jan 25, 2017 11:15, "Jean-Baptiste Onofré"  wrote:
>
> > +1
> >
> > It sounds good to me.
> >
> > Thanks Dan !
> >
> > Regards
> > JB⁣​
> >
> > On Jan 25, 2017, 19:39, at 19:39, Dan Halperin
> 
> > wrote:
> > >Here is my summary of the threads:
> > >
> > >Overwhelming agreement:
> > >
> > >- rename `release` to something more appropriate.
> > >- add `checkstyle` to the default build (it's basically a compile
> > >error)
> > >- add more information to contributor guide
> > >
> > >Reasonable agreement
> > >
> > >- don't update the github instructions to make passing `mvn verify
> > >-P > >checks>` mandatory. Maybe add a hint that this is a good proxy for what
> > >Jenkins will run.
> > >
> > >Unresolved:
> > >
> > >- whether all checks should be in `mvn verify`
> > >- whether `mvn test` is useful for most workflows
> > >
> > >I'll propose to proceed with the overwhelmingly agreed-upon changes,
> > >and as
> > >we see increasingly many new contributors re-evaluate the remaining
> > >issues.
> > >
> > >Thanks,
> > >Dan
> > >
> > >On Tue, Jan 24, 2017 at 12:51 PM, Jean-Baptiste Onofré
> > >
> > >wrote:
> > >
> > >> +1 to at least update the contribution guide and improve the profile
> > >name.
> > >>
> > >> Regards
> > >> JB
> > >>
> > >>
> > >> On 01/24/2017 09:49 PM, Kenneth Knowles wrote:
> > >>
> > >>> My impression is that we don't have consensus on whether all checks
> > >or
> > >>> minimal checks should be the default, or whether we can have both
> > >via `mvn
> > >>> test` and `mvn verify`.
> > >>>
> > >>> But that doesn't prevent us from giving -P release a better name and
> > >>> mentioning it in the dev guide and in some manner in our PR
> > >template.
> > >>>
> > >>> Right now we are living with the combination of the bad aspects -
> > >default
> > >>> is not thorough but not actually fast and a thorough check is
> > >>> undocumented.
> > >>>
> > >>> On Tue, Jan 24, 2017 at 2:22 AM, Ismaël Mejía 
> > >wrote:
> > >>>
> > >>> I just wanted to know if we have achieved some consensus about this,
> > >I
> >  just
> >  saw this PR that reminded me about this discussion.
> > 
> >  ​https://github.com/apache/beam/pull/1829​
> > 
> >  It is important that we mention the existing profiles (and the
> > >intended
> >  checks) in the contribution guide (e.g. -Prelease (or -Pall-checks
> >  triggers
> >  these validations).
> > 
> >  I can add this to the guide if you like once we define the checks
> > >per
> >  stage/profile.
> > 
> >  Ismaël
> > 
> > 
> >  On Wed, Jan 11, 2017 at 8:12 AM, Aviem Zur 
> > >wrote:
> > 
> >  I agree with Dan and Lukasz.
> > > Developers should not be expected to know beforehand which
> > >specific
> > > profiles to run.
> > > The phase specified in the PR instructions (`verify`) should run
> > >all the
> > > relevant verifications and be the "slower" build, while a
> > >preceding
> > > lifecycle, such as `test`, should run the "faster" verifications.
> > >
> > > Aviem.
> > >
> > > On Mon, Jan 9, 2017 at 7:57 PM Robert Bradshaw
> > >
> >   > 
> > >
> > >> wrote:
> > >
> > > On Mon, Jan 9, 2017 at 3:49 AM, Aljoscha Krettek
> > >
> > >> wrote:
> > >>
> > >>> I also usually prefer "mvn verify" to to the expected thing but
> > >I see
> > >>>
> > >> that
> > >>
> > >>> quick iteration times are key.
> > >>>
> > >>
> > >> I see
> > >> https://maven.apache.org/guides/introduction/
> > >>
> > > introduction-to-the-lifecycle.html
> > >
> > >>
> > >> verify - run any checks on results of integration tests to
> > >ensure
> > >> quality criteria are met
> > >>
> > >> Of course our integration tests are long enough that we shouldn't
> > >be
> > >> putting all of them here, but I too would expect checkstyle.
> > >>
> > >> Perhaps we could introduce a verify-fast or somesuch for fast
> > >(but
> > >> lower coverage) turnaround time. I would expect "mvn verify test"
> > >to
> > >> pass before submitting a PR, and would want to run that before
> > >asking
> > >> others to look at it. I think this should be our criteria (i.e.
> > >what
> > >> will a new but maven-savvy user run before pushing their code).
> > >>
> > >> As long as the pre-commit hooks still check everything I'm ok
> > >with
> > >>>
> > >> making
> > >
> > >> the default a little more lightweight.
> > >>>
> > >>
> > >> The fact that our pre-commit hooks take a long time to run does
> > >change
> > >> things. Nothing more annoying than seeing that your PR failed 3
> > >hours
> > >> 

Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Eugene Kirpichov
I don't think we should make batching a core feature of the Beam
programming model (by adding it to DoFn as this code snippet implies). I'm
reasonably sure there are less invasive ways of implementing it.
On Thu, Jan 26, 2017 at 12:22 PM Jean-Baptiste Onofré 
wrote:

> Agree, I'm curious as well.
>
> I guess it would be something like:
>
> .apply(ParDo(new DoFn() {
>
> @Override
> public long batchSize() {
>   return 1000;
> }
>
> @ProcessElement
> public void processElement(ProcessContext context) {
>   ...
> }
> }));
>
> If batchSize (overrided by user) returns a positive long, then DoFn can
> batch with this size.
>
> Regards
> JB
>
> On 01/26/2017 05:38 PM, Eugene Kirpichov wrote:
> > Hi Etienne,
> >
> > Could you post some snippets of how your transform is to be used in a
> > pipeline? I think that would make it easier to discuss on this thread and
> > could save a lot of churn if the discussion ends up leading to a
> different
> > API.
> >
> > On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot 
> > wrote:
> >
> >> Wonderful !
> >>
> >> Thanks Kenn !
> >>
> >> Etienne
> >>
> >>
> >> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :
> >>> Hi Etienne,
> >>>
> >>> I was drafting a proposal about @OnWindowExpiration when this email
> >>> arrived. I thought I would try to quickly unblock you by responding
> with
> >> a
> >>> TL;DR: you can achieve your goals with state & timers as they currently
> >>> exist. You'll set a timer for
> window.maxTimestamp().plus(allowedLateness)
> >>> precisely - when this timer fires, you are guaranteed that the input
> >>> watermark has exceeded this point (so all new data is droppable) while
> >> the
> >>> output timestamp is held to this point (so you can safely output into
> the
> >>> window).
> >>>
> >>> @OnWindowExpiration is (1) a convenience to save you from needing a
> >> handle
> >>> on the allowed lateness (not a problem in your case) and (2) actually
> >>> meaningful and potentially less expensive to implement in the absence
> of
> >>> state (this is why it needs a design discussion at all, really).
> >>>
> >>> Caveat: these APIs are new and not supported in every runner and
> >> windowing
> >>> configuration.
> >>>
> >>> Kenn
> >>>
> >>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot  >
> >>> wrote:
> >>>
>  Hi,
> 
>  I have started to implement this ticket. For now it is implemented as
> a
>  PTransform that simply does ParDo.of(new DoFn) and all the processing
>  related to batching is done in the DoFn.
> 
>  I'm starting to deal with windows and bundles (starting to take a look
> >> at
>  the State API to process trans-bundles, more questions about this to
> >> come).
>  My comments/questions are inline:
> 
> 
>  Le 17/01/2017 à 18:41, Ben Chambers a écrit :
> 
> > We should start by understanding the goals. If elements are in
> >> different
> > windows can they be out in the same batch? If they have different
> > timestamps what timestamp should the batch have?
> >
>  Regarding timestamps: currently design is as so: the transform does
> not
>  group elements in the PCollection, so the "batch" does not exist as an
>  element in the PCollection. There is only a user defined function
>  (perBatchFn) that gets called when batchSize elements have been
> >> processed.
>  This function takes an ArrayList as parameter. So elements keep their
>  original timestamps
> 
> 
>  Regarding windowing: I guess that if elements are not in the same
> >> window,
>  they are not expected to be in the same batch.
>  I'm just starting to work on these subjects, so I might lack a bit of
>  information;
>  what I am currently thinking about is that I need a way to know in the
>  DoFn that the window has expired so that I can call the perBatchFn
> even
> >> if
>  batchSize is not reached.  This is the @OnWindowExpiration callback
> that
>  Kenneth mentioned in an email about bundles.
>  Lets imagine that we have a collection of elements artificially
>  timestamped every 10 seconds (for simplicity of the example) and a
> fixed
>  windowing of 1 minute. Then each window contains 6 elements. If we
> were
> >> to
>  buffer the elements by batches of 5 elements, then for each window we
>  expect to get 2 batches (one of 5 elements, one of 1 element). For
> that
> >> to
>  append, we need a @OnWindowExpiration on the DoFn where we call
> >> perBatchFn
> 
>  As a composite transform this will likely require a group by key which
> >> may
> > affect performance. Maybe within a dofn is better.
> >
>  Yes, the processing is done with a DoFn indeed.
> 
> > Then it could be some annotation or API that informs the runner.
> Should
> > batch sizes be fixed in the annotation (element count or size) or
> >> should
> > 

Re: [ANNOUNCEMENT] New committers, January 2017 edition!

2017-01-26 Thread Thomas Weise
Congrats!


On Thu, Jan 26, 2017 at 7:49 PM, María García Herrero <
mari...@google.com.invalid> wrote:

> Congratulations and thank you for your contributions thus far!
>
> On Thu, Jan 26, 2017 at 6:00 PM, Robert Bradshaw <
> rober...@google.com.invalid> wrote:
>
> > Welcome and congratulations!
> >
> > On Thu, Jan 26, 2017 at 5:05 PM, Sourabh Bajaj
> >  wrote:
> > > Congrats!!
> > >
> > > On Thu, Jan 26, 2017 at 5:02 PM Jason Kuster  > invalid>
> > > wrote:
> > >
> > >> Congrats all! Very exciting. :)
> > >>
> > >> On Thu, Jan 26, 2017 at 4:48 PM, Jesse Anderson <
> je...@smokinghand.com>
> > >> wrote:
> > >>
> > >> > Welcome!
> > >> >
> > >> > On Thu, Jan 26, 2017, 7:27 PM Davor Bonaci 
> wrote:
> > >> >
> > >> > > Please join me and the rest of Beam PMC in welcoming the following
> > >> > > contributors as our newest committers. They have significantly
> > >> > contributed
> > >> > > to the project in different ways, and we look forward to many more
> > >> > > contributions in the future.
> > >> > >
> > >> > > * Stas Levin
> > >> > > Stas has contributed across the breadth of the project, from the
> > Spark
> > >> > > runner to the core pieces and Java SDK. Looking at code
> > contributions
> > >> > > alone, he authored 43 commits and reported 25 issues. Stas is very
> > >> active
> > >> > > on the mailing lists too, contributing to good discussions and
> > >> proposing
> > >> > > improvements to the Beam model.
> > >> > >
> > >> > > * Ahmet Altay
> > >> > > Ahmet is a major contributor to the Python SDK, both in terms of
> > design
> > >> > and
> > >> > > code contribution. Looking at code contributions alone, he
> authored
> > 98
> > >> > > commits and reviewed dozens of pull requests. With Python SDK’s
> > >> imminent
> > >> > > merge to the master branch, Ahmet contributed towards
> establishing a
> > >> new
> > >> > > major component in Beam.
> > >> > >
> > >> > > * Pei He
> > >> > > Pei has been contributing to Beam since its inception,
> accumulating
> > a
> > >> > total
> > >> > > of 118 commits since February. He has made several major
> > contributions,
> > >> > > most recently by redesigning IOChannelFactory / FileSystem APIs
> (in
> > >> > > progress), which would extend Beam’s portability to many
> additional
> > >> file
> > >> > > systems and cloud providers.
> > >> > >
> > >> > > Congratulations to all three! Welcome!
> > >> > >
> > >> > > Davor
> > >> > >
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> ---
> > >> Jason Kuster
> > >> Apache Beam (Incubating) / Google Cloud Dataflow
> > >>
> >
>


Re: [ANNOUNCEMENT] New committers, January 2017 edition!

2017-01-26 Thread Aviem Zur
Congrats!

On Fri, Jan 27, 2017, 06:25 Thomas Weise  wrote:

> Congrats!
>
>
> On Thu, Jan 26, 2017 at 7:49 PM, María García Herrero <
> mari...@google.com.invalid> wrote:
>
> > Congratulations and thank you for your contributions thus far!
> >
> > On Thu, Jan 26, 2017 at 6:00 PM, Robert Bradshaw <
> > rober...@google.com.invalid> wrote:
> >
> > > Welcome and congratulations!
> > >
> > > On Thu, Jan 26, 2017 at 5:05 PM, Sourabh Bajaj
> > >  wrote:
> > > > Congrats!!
> > > >
> > > > On Thu, Jan 26, 2017 at 5:02 PM Jason Kuster  .
> > > invalid>
> > > > wrote:
> > > >
> > > >> Congrats all! Very exciting. :)
> > > >>
> > > >> On Thu, Jan 26, 2017 at 4:48 PM, Jesse Anderson <
> > je...@smokinghand.com>
> > > >> wrote:
> > > >>
> > > >> > Welcome!
> > > >> >
> > > >> > On Thu, Jan 26, 2017, 7:27 PM Davor Bonaci 
> > wrote:
> > > >> >
> > > >> > > Please join me and the rest of Beam PMC in welcoming the
> following
> > > >> > > contributors as our newest committers. They have significantly
> > > >> > contributed
> > > >> > > to the project in different ways, and we look forward to many
> more
> > > >> > > contributions in the future.
> > > >> > >
> > > >> > > * Stas Levin
> > > >> > > Stas has contributed across the breadth of the project, from the
> > > Spark
> > > >> > > runner to the core pieces and Java SDK. Looking at code
> > > contributions
> > > >> > > alone, he authored 43 commits and reported 25 issues. Stas is
> very
> > > >> active
> > > >> > > on the mailing lists too, contributing to good discussions and
> > > >> proposing
> > > >> > > improvements to the Beam model.
> > > >> > >
> > > >> > > * Ahmet Altay
> > > >> > > Ahmet is a major contributor to the Python SDK, both in terms of
> > > design
> > > >> > and
> > > >> > > code contribution. Looking at code contributions alone, he
> > authored
> > > 98
> > > >> > > commits and reviewed dozens of pull requests. With Python SDK’s
> > > >> imminent
> > > >> > > merge to the master branch, Ahmet contributed towards
> > establishing a
> > > >> new
> > > >> > > major component in Beam.
> > > >> > >
> > > >> > > * Pei He
> > > >> > > Pei has been contributing to Beam since its inception,
> > accumulating
> > > a
> > > >> > total
> > > >> > > of 118 commits since February. He has made several major
> > > contributions,
> > > >> > > most recently by redesigning IOChannelFactory / FileSystem APIs
> > (in
> > > >> > > progress), which would extend Beam’s portability to many
> > additional
> > > >> file
> > > >> > > systems and cloud providers.
> > > >> > >
> > > >> > > Congratulations to all three! Welcome!
> > > >> > >
> > > >> > > Davor
> > > >> > >
> > > >> >
> > > >>
> > > >>
> > > >>
> > > >> --
> > > >> ---
> > > >> Jason Kuster
> > > >> Apache Beam (Incubating) / Google Cloud Dataflow
> > > >>
> > >
> >
>


Re: [ANNOUNCEMENT] New committers, January 2017 edition!

2017-01-26 Thread Jean-Baptiste Onofré
Welcome aboard !⁣

Regards
JB

On Jan 27, 2017, 01:27, at 01:27, Davor Bonaci  wrote:
>Please join me and the rest of Beam PMC in welcoming the following
>contributors as our newest committers. They have significantly
>contributed
>to the project in different ways, and we look forward to many more
>contributions in the future.
>
>* Stas Levin
>Stas has contributed across the breadth of the project, from the Spark
>runner to the core pieces and Java SDK. Looking at code contributions
>alone, he authored 43 commits and reported 25 issues. Stas is very
>active
>on the mailing lists too, contributing to good discussions and
>proposing
>improvements to the Beam model.
>
>* Ahmet Altay
>Ahmet is a major contributor to the Python SDK, both in terms of design
>and
>code contribution. Looking at code contributions alone, he authored 98
>commits and reviewed dozens of pull requests. With Python SDK’s
>imminent
>merge to the master branch, Ahmet contributed towards establishing a
>new
>major component in Beam.
>
>* Pei He
>Pei has been contributing to Beam since its inception, accumulating a
>total
>of 118 commits since February. He has made several major contributions,
>most recently by redesigning IOChannelFactory / FileSystem APIs (in
>progress), which would extend Beam’s portability to many additional
>file
>systems and cloud providers.
>
>Congratulations to all three! Welcome!
>
>Davor


Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Jean-Baptiste Onofré
⁣Hi Eugene

A simple way would be to create a BatchedDoFn in an extension.

WDYT ?

Regards
JB

On Jan 26, 2017, 21:48, at 21:48, Eugene Kirpichov 
 wrote:
>I don't think we should make batching a core feature of the Beam
>programming model (by adding it to DoFn as this code snippet implies).
>I'm
>reasonably sure there are less invasive ways of implementing it.
>On Thu, Jan 26, 2017 at 12:22 PM Jean-Baptiste Onofré 
>wrote:
>
>> Agree, I'm curious as well.
>>
>> I guess it would be something like:
>>
>> .apply(ParDo(new DoFn() {
>>
>> @Override
>> public long batchSize() {
>>   return 1000;
>> }
>>
>> @ProcessElement
>> public void processElement(ProcessContext context) {
>>   ...
>> }
>> }));
>>
>> If batchSize (overrided by user) returns a positive long, then DoFn
>can
>> batch with this size.
>>
>> Regards
>> JB
>>
>> On 01/26/2017 05:38 PM, Eugene Kirpichov wrote:
>> > Hi Etienne,
>> >
>> > Could you post some snippets of how your transform is to be used in
>a
>> > pipeline? I think that would make it easier to discuss on this
>thread and
>> > could save a lot of churn if the discussion ends up leading to a
>> different
>> > API.
>> >
>> > On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot
>
>> > wrote:
>> >
>> >> Wonderful !
>> >>
>> >> Thanks Kenn !
>> >>
>> >> Etienne
>> >>
>> >>
>> >> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :
>> >>> Hi Etienne,
>> >>>
>> >>> I was drafting a proposal about @OnWindowExpiration when this
>email
>> >>> arrived. I thought I would try to quickly unblock you by
>responding
>> with
>> >> a
>> >>> TL;DR: you can achieve your goals with state & timers as they
>currently
>> >>> exist. You'll set a timer for
>> window.maxTimestamp().plus(allowedLateness)
>> >>> precisely - when this timer fires, you are guaranteed that the
>input
>> >>> watermark has exceeded this point (so all new data is droppable)
>while
>> >> the
>> >>> output timestamp is held to this point (so you can safely output
>into
>> the
>> >>> window).
>> >>>
>> >>> @OnWindowExpiration is (1) a convenience to save you from needing
>a
>> >> handle
>> >>> on the allowed lateness (not a problem in your case) and (2)
>actually
>> >>> meaningful and potentially less expensive to implement in the
>absence
>> of
>> >>> state (this is why it needs a design discussion at all, really).
>> >>>
>> >>> Caveat: these APIs are new and not supported in every runner and
>> >> windowing
>> >>> configuration.
>> >>>
>> >>> Kenn
>> >>>
>> >>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot
>> >
>> >>> wrote:
>> >>>
>>  Hi,
>> 
>>  I have started to implement this ticket. For now it is
>implemented as
>> a
>>  PTransform that simply does ParDo.of(new DoFn) and all the
>processing
>>  related to batching is done in the DoFn.
>> 
>>  I'm starting to deal with windows and bundles (starting to take
>a look
>> >> at
>>  the State API to process trans-bundles, more questions about
>this to
>> >> come).
>>  My comments/questions are inline:
>> 
>> 
>>  Le 17/01/2017 à 18:41, Ben Chambers a écrit :
>> 
>> > We should start by understanding the goals. If elements are in
>> >> different
>> > windows can they be out in the same batch? If they have
>different
>> > timestamps what timestamp should the batch have?
>> >
>>  Regarding timestamps: currently design is as so: the transform
>does
>> not
>>  group elements in the PCollection, so the "batch" does not exist
>as an
>>  element in the PCollection. There is only a user defined
>function
>>  (perBatchFn) that gets called when batchSize elements have been
>> >> processed.
>>  This function takes an ArrayList as parameter. So elements keep
>their
>>  original timestamps
>> 
>> 
>>  Regarding windowing: I guess that if elements are not in the
>same
>> >> window,
>>  they are not expected to be in the same batch.
>>  I'm just starting to work on these subjects, so I might lack a
>bit of
>>  information;
>>  what I am currently thinking about is that I need a way to know
>in the
>>  DoFn that the window has expired so that I can call the
>perBatchFn
>> even
>> >> if
>>  batchSize is not reached.  This is the @OnWindowExpiration
>callback
>> that
>>  Kenneth mentioned in an email about bundles.
>>  Lets imagine that we have a collection of elements artificially
>>  timestamped every 10 seconds (for simplicity of the example) and
>a
>> fixed
>>  windowing of 1 minute. Then each window contains 6 elements. If
>we
>> were
>> >> to
>>  buffer the elements by batches of 5 elements, then for each
>window we
>>  expect to get 2 batches (one of 5 elements, one of 1 element).
>For
>> that
>> >> to
>>  append, we need a @OnWindowExpiration on the DoFn where we call
>> >> perBatchFn
>> 
>>  As a composite transform 

Re: [ANNOUNCEMENT] New committers, January 2017 edition!

2017-01-26 Thread Kobi Salant
Congrats! Well deserved Stas

בתאריך 27 בינו' 2017 7:26,‏ "Frances Perry"  כתב:

> Woohoo! Congrats ;-)
>
> On Thu, Jan 26, 2017 at 9:05 PM, Jean-Baptiste Onofré 
> wrote:
>
> > Welcome aboard !⁣
> >
> > Regards
> > JB
> >
> > On Jan 27, 2017, 01:27, at 01:27, Davor Bonaci  wrote:
> > >Please join me and the rest of Beam PMC in welcoming the following
> > >contributors as our newest committers. They have significantly
> > >contributed
> > >to the project in different ways, and we look forward to many more
> > >contributions in the future.
> > >
> > >* Stas Levin
> > >Stas has contributed across the breadth of the project, from the Spark
> > >runner to the core pieces and Java SDK. Looking at code contributions
> > >alone, he authored 43 commits and reported 25 issues. Stas is very
> > >active
> > >on the mailing lists too, contributing to good discussions and
> > >proposing
> > >improvements to the Beam model.
> > >
> > >* Ahmet Altay
> > >Ahmet is a major contributor to the Python SDK, both in terms of design
> > >and
> > >code contribution. Looking at code contributions alone, he authored 98
> > >commits and reviewed dozens of pull requests. With Python SDK’s
> > >imminent
> > >merge to the master branch, Ahmet contributed towards establishing a
> > >new
> > >major component in Beam.
> > >
> > >* Pei He
> > >Pei has been contributing to Beam since its inception, accumulating a
> > >total
> > >of 118 commits since February. He has made several major contributions,
> > >most recently by redesigning IOChannelFactory / FileSystem APIs (in
> > >progress), which would extend Beam’s portability to many additional
> > >file
> > >systems and cloud providers.
> > >
> > >Congratulations to all three! Welcome!
> > >
> > >Davor
> >
>


Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Robert Bradshaw
On Thu, Jan 26, 2017 at 6:58 PM, Kenneth Knowles  
wrote:
> On Thu, Jan 26, 2017 at 4:15 PM, Robert Bradshaw <
> rober...@google.com.invalid> wrote:
>
>> On Thu, Jan 26, 2017 at 3:42 PM, Eugene Kirpichov
>>  wrote:
>> >
>> > you can't wrap DoFn's, period
>>
>> As a simple example, given a DoFn it's perfectly natural to want
>> to "wrap" this as a DoFn, KV>. State, side inputs,
>> windows, etc. would just be passed through.
>
>
>> The fact that this is complicated, with reflection and flexible
>> signatures and byte generation, is a property of the SDK (to provide a
>> flexible DoFn API). I agree that it's nice to hide this complexity
>> from the user, and it discourages this kind of composability.
>>
>
> 
> The difficulty of this sort of composability is a particularly bothersome
> issue for DoFn. It is solvable but the solutions may seem esoteric.
>
>  - Supporting wrapped _invocation_ is actually as easy as before if we
> chose to embrace it: ArgumentProvider is roughly the same thing as ye olde
> ProcessContext. We can easily provide it via our parameter mechanism, and
> DoFnInvokers can be used or we can also provide a DoFnInvoker for some
> requested DoFn.
>
>  - Supporting wrapped analysis is a bit uglier but I don't think there are
> technical blockers. It was actually quite bad prior to the new DoFn - you
> couldn't know statically whether the wrapper class had to "implements
> RequiresWindowAccess" so you would just have to do it "just in case". With
> the new DoFn I could imagine a `@Wraps public DoFn getWrapped()` though
> we'd still have to be able to merge this with whatever the wrapper requires
> - for example if they both use state there will be nothing calling out the
> fact that the wrapper needs to create a disjoint namespace. We could even
> make this required if there is an ArgumentProvider parameter and/or
> automatically provide a DoFnInvoker for this DoFn (so DoFnInvoker would be
> in the core SDK and DoFnInvokers would be only in the SDK Fn harness).
>
> So I think the analysis problems are fundamental, not part of the
> particulars of the new DoFn API or any particular SDK.
> 


My point is that the DoFn as a concept in the Beam model is
fundamentally, though not perfectly, compossible. Both invocation and
analysis are functions of the SDK, and solvable, though perhaps not
easily (and/or requiring what would normally be considered
implementation details).


> Coming back to the ticket... I'm going to echo Ben's early point, and now
> Eugene's and Robert's that we should enumerate further use cases explicitly
> and preferably add them to the JIRA.
>
> Both SO questions are answered with essentially an inlined PTransform Iterable> with a maximum iterable size, for batched RPCs downstream. You
> can easily build timestamped and reified-windows versions without that
> transform being aware of it. It is simple to understand, as easy to
> implement a unified-model version via state & timers as those SO answers,
> and doesn't require DoFn nesting.

I think Eugene surfaced the key point, which is it depends on what one
does with the output.

Both of these emit "result" in the timestamp and window of whatever
the last element of the batch was, regardless of the other elements.
Of course they'll both break at runtime (with high probability) for
inputs windowed by anything but timestamp-invariant WindowFns like
GlobalWindows as the emit in finalize won't have an ambient window or
timestamp to assign to its output.

This is fine for writes or other terminal nodes without output (though
even writes may have output).

> I would love to learn more about use
> cases that either debunk this or refine it.

Anytime one wants to consume the output in a non-globally-windowed,
non-uniform-timestamp way is broken with the SO answers above. In
particular, streaming. (Adapting these to use state would restrict
batches to be per-window, per-key, and still not respect timestamps.)

The usecases I gave are one follows the batch computation with an
"unbatch." Put another way, the batching allows amortization of
processing cost for logically independent elements. One would have to
do this by reifying windows and timestamps (holding back the timestamp
to the earliest member of the batch), storing elements in an
accumulator (per-bundle locally or cross-bundle using
(per-key-window(?)) state) processing the batch, and then restoring
windows and timestamp. However, the user of the API shouldn't have to
manually deal with the window-reified elements, i.e. they should
provide a List -> List DoFn rather than a List
-> List one which is why I think nesting, or at
least wrapping, is required.

Any outputs other than 1:0 or 1:1 requires more manual thought as to
what the window and timestamp of the output should be (or restriction
of batches to single windows/timestamps, or rounding up timestamps to
the end of 

Re: [ANNOUNCEMENT] New committers, January 2017 edition!

2017-01-26 Thread Jason Kuster
Congrats all! Very exciting. :)

On Thu, Jan 26, 2017 at 4:48 PM, Jesse Anderson 
wrote:

> Welcome!
>
> On Thu, Jan 26, 2017, 7:27 PM Davor Bonaci  wrote:
>
> > Please join me and the rest of Beam PMC in welcoming the following
> > contributors as our newest committers. They have significantly
> contributed
> > to the project in different ways, and we look forward to many more
> > contributions in the future.
> >
> > * Stas Levin
> > Stas has contributed across the breadth of the project, from the Spark
> > runner to the core pieces and Java SDK. Looking at code contributions
> > alone, he authored 43 commits and reported 25 issues. Stas is very active
> > on the mailing lists too, contributing to good discussions and proposing
> > improvements to the Beam model.
> >
> > * Ahmet Altay
> > Ahmet is a major contributor to the Python SDK, both in terms of design
> and
> > code contribution. Looking at code contributions alone, he authored 98
> > commits and reviewed dozens of pull requests. With Python SDK’s imminent
> > merge to the master branch, Ahmet contributed towards establishing a new
> > major component in Beam.
> >
> > * Pei He
> > Pei has been contributing to Beam since its inception, accumulating a
> total
> > of 118 commits since February. He has made several major contributions,
> > most recently by redesigning IOChannelFactory / FileSystem APIs (in
> > progress), which would extend Beam’s portability to many additional file
> > systems and cloud providers.
> >
> > Congratulations to all three! Welcome!
> >
> > Davor
> >
>



-- 
---
Jason Kuster
Apache Beam (Incubating) / Google Cloud Dataflow


Re: [ANNOUNCEMENT] New committers, January 2017 edition!

2017-01-26 Thread Sourabh Bajaj
Congrats!!

On Thu, Jan 26, 2017 at 5:02 PM Jason Kuster 
wrote:

> Congrats all! Very exciting. :)
>
> On Thu, Jan 26, 2017 at 4:48 PM, Jesse Anderson 
> wrote:
>
> > Welcome!
> >
> > On Thu, Jan 26, 2017, 7:27 PM Davor Bonaci  wrote:
> >
> > > Please join me and the rest of Beam PMC in welcoming the following
> > > contributors as our newest committers. They have significantly
> > contributed
> > > to the project in different ways, and we look forward to many more
> > > contributions in the future.
> > >
> > > * Stas Levin
> > > Stas has contributed across the breadth of the project, from the Spark
> > > runner to the core pieces and Java SDK. Looking at code contributions
> > > alone, he authored 43 commits and reported 25 issues. Stas is very
> active
> > > on the mailing lists too, contributing to good discussions and
> proposing
> > > improvements to the Beam model.
> > >
> > > * Ahmet Altay
> > > Ahmet is a major contributor to the Python SDK, both in terms of design
> > and
> > > code contribution. Looking at code contributions alone, he authored 98
> > > commits and reviewed dozens of pull requests. With Python SDK’s
> imminent
> > > merge to the master branch, Ahmet contributed towards establishing a
> new
> > > major component in Beam.
> > >
> > > * Pei He
> > > Pei has been contributing to Beam since its inception, accumulating a
> > total
> > > of 118 commits since February. He has made several major contributions,
> > > most recently by redesigning IOChannelFactory / FileSystem APIs (in
> > > progress), which would extend Beam’s portability to many additional
> file
> > > systems and cloud providers.
> > >
> > > Congratulations to all three! Welcome!
> > >
> > > Davor
> > >
> >
>
>
>
> --
> ---
> Jason Kuster
> Apache Beam (Incubating) / Google Cloud Dataflow
>


Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Robert Bradshaw
On Thu, Jan 26, 2017 at 4:20 PM, Ben Chambers
 wrote:
> Here's an example API that would make this part of a DoFn. The idea here is
> that it would still be run as `ParDo.of(new MyBatchedDoFn())`, but the
> runner (and DoFnRunner) could see that it has asked for batches, so rather
> than calling a `processElement` on every input `I`, it assembles a
> `Collection` and then calls the method.
>
> Possible API making this part of DoFn (with a fixed size):
>
> public MyBatchedDoFn extends DoFn {
>   @ProcessBatch(size = 50)
>   public void processBatch(ProcessContext c) {
> Collection batchContents = c.element();
> ...
>   }
> }
>
> Possible API making this part of DoFn (with dynamic size):
>
> public MyBatchedDoFn extends DoFn {
>   @ProcessBatch
>   public boolean processBatch(ProcessContext c) {
> Collection batchContents = c.element();
> if (batchContents.size() < 50) {
>   return false; // batch not yet processed
> }
>
> ...
> return true;
>   }
> }

Or even

public MyBatchedDoFn extends DoFn {
  public void processElement(Iterable batch) {
[process the batch]
  }
}

though I'd rather this not be baked into the DoFn API if it can be
solved separately.


Re: [ANNOUNCEMENT] New committers, January 2017 edition!

2017-01-26 Thread Robert Bradshaw
Welcome and congratulations!

On Thu, Jan 26, 2017 at 5:05 PM, Sourabh Bajaj
 wrote:
> Congrats!!
>
> On Thu, Jan 26, 2017 at 5:02 PM Jason Kuster 
> wrote:
>
>> Congrats all! Very exciting. :)
>>
>> On Thu, Jan 26, 2017 at 4:48 PM, Jesse Anderson 
>> wrote:
>>
>> > Welcome!
>> >
>> > On Thu, Jan 26, 2017, 7:27 PM Davor Bonaci  wrote:
>> >
>> > > Please join me and the rest of Beam PMC in welcoming the following
>> > > contributors as our newest committers. They have significantly
>> > contributed
>> > > to the project in different ways, and we look forward to many more
>> > > contributions in the future.
>> > >
>> > > * Stas Levin
>> > > Stas has contributed across the breadth of the project, from the Spark
>> > > runner to the core pieces and Java SDK. Looking at code contributions
>> > > alone, he authored 43 commits and reported 25 issues. Stas is very
>> active
>> > > on the mailing lists too, contributing to good discussions and
>> proposing
>> > > improvements to the Beam model.
>> > >
>> > > * Ahmet Altay
>> > > Ahmet is a major contributor to the Python SDK, both in terms of design
>> > and
>> > > code contribution. Looking at code contributions alone, he authored 98
>> > > commits and reviewed dozens of pull requests. With Python SDK’s
>> imminent
>> > > merge to the master branch, Ahmet contributed towards establishing a
>> new
>> > > major component in Beam.
>> > >
>> > > * Pei He
>> > > Pei has been contributing to Beam since its inception, accumulating a
>> > total
>> > > of 118 commits since February. He has made several major contributions,
>> > > most recently by redesigning IOChannelFactory / FileSystem APIs (in
>> > > progress), which would extend Beam’s portability to many additional
>> file
>> > > systems and cloud providers.
>> > >
>> > > Congratulations to all three! Welcome!
>> > >
>> > > Davor
>> > >
>> >
>>
>>
>>
>> --
>> ---
>> Jason Kuster
>> Apache Beam (Incubating) / Google Cloud Dataflow
>>


Re: [ANNOUNCEMENT] New committers, January 2017 edition!

2017-01-26 Thread Jesse Anderson
Welcome!

On Thu, Jan 26, 2017, 7:27 PM Davor Bonaci  wrote:

> Please join me and the rest of Beam PMC in welcoming the following
> contributors as our newest committers. They have significantly contributed
> to the project in different ways, and we look forward to many more
> contributions in the future.
>
> * Stas Levin
> Stas has contributed across the breadth of the project, from the Spark
> runner to the core pieces and Java SDK. Looking at code contributions
> alone, he authored 43 commits and reported 25 issues. Stas is very active
> on the mailing lists too, contributing to good discussions and proposing
> improvements to the Beam model.
>
> * Ahmet Altay
> Ahmet is a major contributor to the Python SDK, both in terms of design and
> code contribution. Looking at code contributions alone, he authored 98
> commits and reviewed dozens of pull requests. With Python SDK’s imminent
> merge to the master branch, Ahmet contributed towards establishing a new
> major component in Beam.
>
> * Pei He
> Pei has been contributing to Beam since its inception, accumulating a total
> of 118 commits since February. He has made several major contributions,
> most recently by redesigning IOChannelFactory / FileSystem APIs (in
> progress), which would extend Beam’s portability to many additional file
> systems and cloud providers.
>
> Congratulations to all three! Welcome!
>
> Davor
>


Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Ben Chambers
I think that wrapping the DoFn is tricky -- we backed out
IntraBundleParallelization because it did that, and it has weird
interactions with both the reflective DoFn and windowing. We could maybe
make some kind of "DoFnDelegatingDoFn" that could act as a base class and
get some of that right, but...

One question I have is whether this batching should be "make batches of N
and if you need to wait for the Nth element do so" or "make batches of at
most N but don't wait too long if you don't get to N". In the former case,
we'll need to do something to buffer elements between bundles -- whether
this is using State or a GroupByKey, etc. In the latter case, the buffering
can happen entirely within a bundle -- if you get to the end of the bundle
and only have 5 elements, even if 5 < N, process that as a batch (rather
than shifting it somewhere else).

On Thu, Jan 26, 2017 at 3:01 PM Robert Bradshaw 
wrote:

> On Thu, Jan 26, 2017 at 12:48 PM, Eugene Kirpichov
>  wrote:
> > I don't think we should make batching a core feature of the Beam
> > programming model (by adding it to DoFn as this code snippet implies).
> I'm
> > reasonably sure there are less invasive ways of implementing it.
>
> +1, either as a PTransform or a DoFn that
> wraps/delegates to a DoFn.
>
> > On Thu, Jan 26, 2017 at 12:22 PM Jean-Baptiste Onofré 
> > wrote:
> >
> >> Agree, I'm curious as well.
> >>
> >> I guess it would be something like:
> >>
> >> .apply(ParDo(new DoFn() {
> >>
> >> @Override
> >> public long batchSize() {
> >>   return 1000;
> >> }
> >>
> >> @ProcessElement
> >> public void processElement(ProcessContext context) {
> >>   ...
> >> }
> >> }));
> >>
> >> If batchSize (overrided by user) returns a positive long, then DoFn can
> >> batch with this size.
> >>
> >> Regards
> >> JB
> >>
> >> On 01/26/2017 05:38 PM, Eugene Kirpichov wrote:
> >> > Hi Etienne,
> >> >
> >> > Could you post some snippets of how your transform is to be used in a
> >> > pipeline? I think that would make it easier to discuss on this thread
> and
> >> > could save a lot of churn if the discussion ends up leading to a
> >> different
> >> > API.
> >> >
> >> > On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot  >
> >> > wrote:
> >> >
> >> >> Wonderful !
> >> >>
> >> >> Thanks Kenn !
> >> >>
> >> >> Etienne
> >> >>
> >> >>
> >> >> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :
> >> >>> Hi Etienne,
> >> >>>
> >> >>> I was drafting a proposal about @OnWindowExpiration when this email
> >> >>> arrived. I thought I would try to quickly unblock you by responding
> >> with
> >> >> a
> >> >>> TL;DR: you can achieve your goals with state & timers as they
> currently
> >> >>> exist. You'll set a timer for
> >> window.maxTimestamp().plus(allowedLateness)
> >> >>> precisely - when this timer fires, you are guaranteed that the input
> >> >>> watermark has exceeded this point (so all new data is droppable)
> while
> >> >> the
> >> >>> output timestamp is held to this point (so you can safely output
> into
> >> the
> >> >>> window).
> >> >>>
> >> >>> @OnWindowExpiration is (1) a convenience to save you from needing a
> >> >> handle
> >> >>> on the allowed lateness (not a problem in your case) and (2)
> actually
> >> >>> meaningful and potentially less expensive to implement in the
> absence
> >> of
> >> >>> state (this is why it needs a design discussion at all, really).
> >> >>>
> >> >>> Caveat: these APIs are new and not supported in every runner and
> >> >> windowing
> >> >>> configuration.
> >> >>>
> >> >>> Kenn
> >> >>>
> >> >>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot <
> echauc...@gmail.com
> >> >
> >> >>> wrote:
> >> >>>
> >>  Hi,
> >> 
> >>  I have started to implement this ticket. For now it is implemented
> as
> >> a
> >>  PTransform that simply does ParDo.of(new DoFn) and all the
> processing
> >>  related to batching is done in the DoFn.
> >> 
> >>  I'm starting to deal with windows and bundles (starting to take a
> look
> >> >> at
> >>  the State API to process trans-bundles, more questions about this
> to
> >> >> come).
> >>  My comments/questions are inline:
> >> 
> >> 
> >>  Le 17/01/2017 à 18:41, Ben Chambers a écrit :
> >> 
> >> > We should start by understanding the goals. If elements are in
> >> >> different
> >> > windows can they be out in the same batch? If they have different
> >> > timestamps what timestamp should the batch have?
> >> >
> >>  Regarding timestamps: currently design is as so: the transform does
> >> not
> >>  group elements in the PCollection, so the "batch" does not exist
> as an
> >>  element in the PCollection. There is only a user defined function
> >>  (perBatchFn) that gets called when batchSize elements have been
> >> >> processed.
> >>  This function takes an 

Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Robert Bradshaw
On Thu, Jan 26, 2017 at 3:31 PM, Ben Chambers
 wrote:
> I think that wrapping the DoFn is tricky -- we backed out
> IntraBundleParallelization because it did that, and it has weird
> interactions with both the reflective DoFn and windowing. We could maybe
> make some kind of "DoFnDelegatingDoFn" that could act as a base class and
> get some of that right, but...

Yeah, this is a lot trickier with NewDoFn. Which is unfortunate as
this isn't the only case where we want to make DoFns more compossible.

> One question I have is whether this batching should be "make batches of N
> and if you need to wait for the Nth element do so" or "make batches of at
> most N but don't wait too long if you don't get to N". In the former case,
> we'll need to do something to buffer elements between bundles -- whether
> this is using State or a GroupByKey, etc. In the latter case, the buffering
> can happen entirely within a bundle -- if you get to the end of the bundle
> and only have 5 elements, even if 5 < N, process that as a batch (rather
> than shifting it somewhere else).

I think the "make batches of at most N but don't wait too long if you
don't get to N" is a very useful first (and tractable) start that can
be built on.

> On Thu, Jan 26, 2017 at 3:01 PM Robert Bradshaw 
> wrote:
>
>> On Thu, Jan 26, 2017 at 12:48 PM, Eugene Kirpichov
>>  wrote:
>> > I don't think we should make batching a core feature of the Beam
>> > programming model (by adding it to DoFn as this code snippet implies).
>> I'm
>> > reasonably sure there are less invasive ways of implementing it.
>>
>> +1, either as a PTransform or a DoFn that
>> wraps/delegates to a DoFn.
>>
>> > On Thu, Jan 26, 2017 at 12:22 PM Jean-Baptiste Onofré 
>> > wrote:
>> >
>> >> Agree, I'm curious as well.
>> >>
>> >> I guess it would be something like:
>> >>
>> >> .apply(ParDo(new DoFn() {
>> >>
>> >> @Override
>> >> public long batchSize() {
>> >>   return 1000;
>> >> }
>> >>
>> >> @ProcessElement
>> >> public void processElement(ProcessContext context) {
>> >>   ...
>> >> }
>> >> }));
>> >>
>> >> If batchSize (overrided by user) returns a positive long, then DoFn can
>> >> batch with this size.
>> >>
>> >> Regards
>> >> JB
>> >>
>> >> On 01/26/2017 05:38 PM, Eugene Kirpichov wrote:
>> >> > Hi Etienne,
>> >> >
>> >> > Could you post some snippets of how your transform is to be used in a
>> >> > pipeline? I think that would make it easier to discuss on this thread
>> and
>> >> > could save a lot of churn if the discussion ends up leading to a
>> >> different
>> >> > API.
>> >> >
>> >> > On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot > >
>> >> > wrote:
>> >> >
>> >> >> Wonderful !
>> >> >>
>> >> >> Thanks Kenn !
>> >> >>
>> >> >> Etienne
>> >> >>
>> >> >>
>> >> >> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :
>> >> >>> Hi Etienne,
>> >> >>>
>> >> >>> I was drafting a proposal about @OnWindowExpiration when this email
>> >> >>> arrived. I thought I would try to quickly unblock you by responding
>> >> with
>> >> >> a
>> >> >>> TL;DR: you can achieve your goals with state & timers as they
>> currently
>> >> >>> exist. You'll set a timer for
>> >> window.maxTimestamp().plus(allowedLateness)
>> >> >>> precisely - when this timer fires, you are guaranteed that the input
>> >> >>> watermark has exceeded this point (so all new data is droppable)
>> while
>> >> >> the
>> >> >>> output timestamp is held to this point (so you can safely output
>> into
>> >> the
>> >> >>> window).
>> >> >>>
>> >> >>> @OnWindowExpiration is (1) a convenience to save you from needing a
>> >> >> handle
>> >> >>> on the allowed lateness (not a problem in your case) and (2)
>> actually
>> >> >>> meaningful and potentially less expensive to implement in the
>> absence
>> >> of
>> >> >>> state (this is why it needs a design discussion at all, really).
>> >> >>>
>> >> >>> Caveat: these APIs are new and not supported in every runner and
>> >> >> windowing
>> >> >>> configuration.
>> >> >>>
>> >> >>> Kenn
>> >> >>>
>> >> >>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot <
>> echauc...@gmail.com
>> >> >
>> >> >>> wrote:
>> >> >>>
>> >>  Hi,
>> >> 
>> >>  I have started to implement this ticket. For now it is implemented
>> as
>> >> a
>> >>  PTransform that simply does ParDo.of(new DoFn) and all the
>> processing
>> >>  related to batching is done in the DoFn.
>> >> 
>> >>  I'm starting to deal with windows and bundles (starting to take a
>> look
>> >> >> at
>> >>  the State API to process trans-bundles, more questions about this
>> to
>> >> >> come).
>> >>  My comments/questions are inline:
>> >> 
>> >> 
>> >>  Le 17/01/2017 à 18:41, Ben Chambers a écrit :
>> >> 
>> >> > We should start by understanding the goals. If elements are in
>> >> >> different

Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Eugene Kirpichov
I agree that wrapping the DoFn is probably not the way to go, because the
DoFn may be quite tricky due to all the reflective features: e.g. how do
you automatically "batch" a DoFn that uses state and timers? What about a
DoFn that uses a BoundedWindow parameter? What about a splittable DoFn?
What about future reflective features? The class for invoking DoFn's,
DoFnInvokers, is absent from the SDK (and present in runners-core) for a
good reason.

I'd rather leave the intricacies of invoking DoFn's to runners, and say
that you can't wrap DoFn's, period - "adapter", "decorator" and other
design patterns just don't apply to DoFn's.

The two options for batching are:
- A transform that takes elements and produces batches, like Robert said
- A simple Beam-agnostic library that takes Java objects and produces
batches of Java objects, with an API that makes it convenient to use in a
typical batching DoFn

On Thu, Jan 26, 2017 at 3:31 PM Ben Chambers 
wrote:

> I think that wrapping the DoFn is tricky -- we backed out
> IntraBundleParallelization because it did that, and it has weird
> interactions with both the reflective DoFn and windowing. We could maybe
> make some kind of "DoFnDelegatingDoFn" that could act as a base class and
> get some of that right, but...
>
> One question I have is whether this batching should be "make batches of N
> and if you need to wait for the Nth element do so" or "make batches of at
> most N but don't wait too long if you don't get to N". In the former case,
> we'll need to do something to buffer elements between bundles -- whether
> this is using State or a GroupByKey, etc. In the latter case, the buffering
> can happen entirely within a bundle -- if you get to the end of the bundle
> and only have 5 elements, even if 5 < N, process that as a batch (rather
> than shifting it somewhere else).
>
> On Thu, Jan 26, 2017 at 3:01 PM Robert Bradshaw
> 
> wrote:
>
> > On Thu, Jan 26, 2017 at 12:48 PM, Eugene Kirpichov
> >  wrote:
> > > I don't think we should make batching a core feature of the Beam
> > > programming model (by adding it to DoFn as this code snippet implies).
> > I'm
> > > reasonably sure there are less invasive ways of implementing it.
> >
> > +1, either as a PTransform or a DoFn that
> > wraps/delegates to a DoFn.
> >
> > > On Thu, Jan 26, 2017 at 12:22 PM Jean-Baptiste Onofré  >
> > > wrote:
> > >
> > >> Agree, I'm curious as well.
> > >>
> > >> I guess it would be something like:
> > >>
> > >> .apply(ParDo(new DoFn() {
> > >>
> > >> @Override
> > >> public long batchSize() {
> > >>   return 1000;
> > >> }
> > >>
> > >> @ProcessElement
> > >> public void processElement(ProcessContext context) {
> > >>   ...
> > >> }
> > >> }));
> > >>
> > >> If batchSize (overrided by user) returns a positive long, then DoFn
> can
> > >> batch with this size.
> > >>
> > >> Regards
> > >> JB
> > >>
> > >> On 01/26/2017 05:38 PM, Eugene Kirpichov wrote:
> > >> > Hi Etienne,
> > >> >
> > >> > Could you post some snippets of how your transform is to be used in
> a
> > >> > pipeline? I think that would make it easier to discuss on this
> thread
> > and
> > >> > could save a lot of churn if the discussion ends up leading to a
> > >> different
> > >> > API.
> > >> >
> > >> > On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot <
> echauc...@gmail.com
> > >
> > >> > wrote:
> > >> >
> > >> >> Wonderful !
> > >> >>
> > >> >> Thanks Kenn !
> > >> >>
> > >> >> Etienne
> > >> >>
> > >> >>
> > >> >> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :
> > >> >>> Hi Etienne,
> > >> >>>
> > >> >>> I was drafting a proposal about @OnWindowExpiration when this
> email
> > >> >>> arrived. I thought I would try to quickly unblock you by
> responding
> > >> with
> > >> >> a
> > >> >>> TL;DR: you can achieve your goals with state & timers as they
> > currently
> > >> >>> exist. You'll set a timer for
> > >> window.maxTimestamp().plus(allowedLateness)
> > >> >>> precisely - when this timer fires, you are guaranteed that the
> input
> > >> >>> watermark has exceeded this point (so all new data is droppable)
> > while
> > >> >> the
> > >> >>> output timestamp is held to this point (so you can safely output
> > into
> > >> the
> > >> >>> window).
> > >> >>>
> > >> >>> @OnWindowExpiration is (1) a convenience to save you from needing
> a
> > >> >> handle
> > >> >>> on the allowed lateness (not a problem in your case) and (2)
> > actually
> > >> >>> meaningful and potentially less expensive to implement in the
> > absence
> > >> of
> > >> >>> state (this is why it needs a design discussion at all, really).
> > >> >>>
> > >> >>> Caveat: these APIs are new and not supported in every runner and
> > >> >> windowing
> > >> >>> configuration.
> > >> >>>
> > >> >>> Kenn
> > >> >>>
> > >> >>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot <
> > 

Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Kenneth Knowles
On Thu, Jan 26, 2017 at 3:42 PM, Eugene Kirpichov <
kirpic...@google.com.invalid> wrote:

> The class for invoking DoFn's,
> DoFnInvokers, is absent from the SDK (and present in runners-core) for a
> good reason.
>

This would be true if it weren't for that pesky DoFnTester :-)

And even if we solve that problem, in the future it will be in the SDK's Fn
Harness.


Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Ben Chambers
The third option for batching:

- Functionality within the DoFn and DoFnRunner built as part of the SDK.

I haven't thought through Batching, but at least for the
IntraBundleParallelization use case this actually does make sense to expose
as a part of the model. Knowing that a DoFn supports parallelization, a
runner may want to control how much parallelization is allowed, and the
DoFn also needs to make sure to wait on all those threads (and make sure
they're properly setup for logging/metrics/etc. associated with the current
step).

There may be good reasons to make this a property of a DoFn that the runner
can inspect, and support. For instance, if a DoFn wants to process batches
of 50, it may be possible to factor that into how input is split/bundled.

On Thu, Jan 26, 2017 at 3:49 PM Kenneth Knowles 
wrote:

> On Thu, Jan 26, 2017 at 3:42 PM, Eugene Kirpichov <
> kirpic...@google.com.invalid> wrote:
>
> > The class for invoking DoFn's,
> > DoFnInvokers, is absent from the SDK (and present in runners-core) for a
> > good reason.
> >
>
> This would be true if it weren't for that pesky DoFnTester :-)
>
> And even if we solve that problem, in the future it will be in the SDK's Fn
> Harness.
>


Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Robert Bradshaw
On Thu, Jan 26, 2017 at 12:48 PM, Eugene Kirpichov
 wrote:
> I don't think we should make batching a core feature of the Beam
> programming model (by adding it to DoFn as this code snippet implies). I'm
> reasonably sure there are less invasive ways of implementing it.

+1, either as a PTransform or a DoFn that
wraps/delegates to a DoFn.

> On Thu, Jan 26, 2017 at 12:22 PM Jean-Baptiste Onofré 
> wrote:
>
>> Agree, I'm curious as well.
>>
>> I guess it would be something like:
>>
>> .apply(ParDo(new DoFn() {
>>
>> @Override
>> public long batchSize() {
>>   return 1000;
>> }
>>
>> @ProcessElement
>> public void processElement(ProcessContext context) {
>>   ...
>> }
>> }));
>>
>> If batchSize (overrided by user) returns a positive long, then DoFn can
>> batch with this size.
>>
>> Regards
>> JB
>>
>> On 01/26/2017 05:38 PM, Eugene Kirpichov wrote:
>> > Hi Etienne,
>> >
>> > Could you post some snippets of how your transform is to be used in a
>> > pipeline? I think that would make it easier to discuss on this thread and
>> > could save a lot of churn if the discussion ends up leading to a
>> different
>> > API.
>> >
>> > On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot 
>> > wrote:
>> >
>> >> Wonderful !
>> >>
>> >> Thanks Kenn !
>> >>
>> >> Etienne
>> >>
>> >>
>> >> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :
>> >>> Hi Etienne,
>> >>>
>> >>> I was drafting a proposal about @OnWindowExpiration when this email
>> >>> arrived. I thought I would try to quickly unblock you by responding
>> with
>> >> a
>> >>> TL;DR: you can achieve your goals with state & timers as they currently
>> >>> exist. You'll set a timer for
>> window.maxTimestamp().plus(allowedLateness)
>> >>> precisely - when this timer fires, you are guaranteed that the input
>> >>> watermark has exceeded this point (so all new data is droppable) while
>> >> the
>> >>> output timestamp is held to this point (so you can safely output into
>> the
>> >>> window).
>> >>>
>> >>> @OnWindowExpiration is (1) a convenience to save you from needing a
>> >> handle
>> >>> on the allowed lateness (not a problem in your case) and (2) actually
>> >>> meaningful and potentially less expensive to implement in the absence
>> of
>> >>> state (this is why it needs a design discussion at all, really).
>> >>>
>> >>> Caveat: these APIs are new and not supported in every runner and
>> >> windowing
>> >>> configuration.
>> >>>
>> >>> Kenn
>> >>>
>> >>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot > >
>> >>> wrote:
>> >>>
>>  Hi,
>> 
>>  I have started to implement this ticket. For now it is implemented as
>> a
>>  PTransform that simply does ParDo.of(new DoFn) and all the processing
>>  related to batching is done in the DoFn.
>> 
>>  I'm starting to deal with windows and bundles (starting to take a look
>> >> at
>>  the State API to process trans-bundles, more questions about this to
>> >> come).
>>  My comments/questions are inline:
>> 
>> 
>>  Le 17/01/2017 à 18:41, Ben Chambers a écrit :
>> 
>> > We should start by understanding the goals. If elements are in
>> >> different
>> > windows can they be out in the same batch? If they have different
>> > timestamps what timestamp should the batch have?
>> >
>>  Regarding timestamps: currently design is as so: the transform does
>> not
>>  group elements in the PCollection, so the "batch" does not exist as an
>>  element in the PCollection. There is only a user defined function
>>  (perBatchFn) that gets called when batchSize elements have been
>> >> processed.
>>  This function takes an ArrayList as parameter. So elements keep their
>>  original timestamps
>> 
>> 
>>  Regarding windowing: I guess that if elements are not in the same
>> >> window,
>>  they are not expected to be in the same batch.
>>  I'm just starting to work on these subjects, so I might lack a bit of
>>  information;
>>  what I am currently thinking about is that I need a way to know in the
>>  DoFn that the window has expired so that I can call the perBatchFn
>> even
>> >> if
>>  batchSize is not reached.  This is the @OnWindowExpiration callback
>> that
>>  Kenneth mentioned in an email about bundles.
>>  Lets imagine that we have a collection of elements artificially
>>  timestamped every 10 seconds (for simplicity of the example) and a
>> fixed
>>  windowing of 1 minute. Then each window contains 6 elements. If we
>> were
>> >> to
>>  buffer the elements by batches of 5 elements, then for each window we
>>  expect to get 2 batches (one of 5 elements, one of 1 element). For
>> that
>> >> to
>>  append, we need a @OnWindowExpiration on the DoFn where we call
>> >> perBatchFn
>> 
>>  As a composite transform this will likely