Re: TestStream and stateful processing

2017-09-13 Thread Thomas Weise
Since how the state is mutated depends on the DoFn implementation, wouldn't
this require the DoFn author to supply the merge logic?

Something like

@Merge
merge(@StateId("myCount") ValueState merged, @StateId("myCount")
ValueState[] src)

Without the support in Beam, what workaround could be suitable to mimic the
effect of a session window with stateful DoFn in user land? Perhaps using
the global window and manage the gap in the DoFn? The problem is that
garbage collection would not occur with unlimited growth of state as result.

Thanks,
Thomas



On Sat, Sep 9, 2017 at 3:10 PM, Kenneth Knowles 
wrote:

> It is likely that this may be runner specific, but may fit into the
> StatefulDoFnRunner. The approach of adding a tweaked GBK under the hood
> (what the DirectRunner and Dataflow do) won't work; the ParDo needs to
> re-run window merging on its own state. Also, I don't have an ready answer
> for timers.
>
> On Fri, Sep 8, 2017 at 4:22 PM, Reuven Lax 
> wrote:
>
> > Of course if you want to help make stateful DoFn work with session
> windows,
> > I'm sure that would be much appreciated  :)
> >
> > On Fri, Sep 8, 2017 at 4:12 PM, Thomas Weise  wrote:
> >
> > > Thanks, this first of all helped me finding a bug in my test (assigning
> > the
> > > timestamps for the Create.of(...) case consistent with TestStream.
> After
> > > this the result is as per what you suggest:
> > >
> > > Expected result with global or fixed window and stateful DoFn is not
> > > working with session window, unless elements have identical timestamp.
> > >
> > > Thanks,
> > > Thomas
> > >
> > >
> > > On Fri, Sep 8, 2017 at 1:52 PM, Reuven Lax 
> > > wrote:
> > >
> > > > I believe that stateful DoFn does not yet work with merging windows.
> > This
> > > > is an open bug in Beam that should be fixed.
> > > >
> > > > On Fri, Sep 8, 2017 at 12:41 PM, Thomas Weise 
> wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > Is there a known limitation in TestStream that causes stateful DoFn
> > not
> > > > to
> > > > > work with windows (state is not tracked)?
> > > > >
> > > > > The pipeline is:
> > > > >
> > > > > TestStream (or Create.of(...)) -> Session Window -> MapElements ->
> > > > Stateful
> > > > > DoFn
> > > > >
> > > > > With TestStream, only when I remove the session window then state
> > > tracks.
> > > > > Session window functionality seems to works with other transforms
> > > though.
> > > > >
> > > > > With Create.of(...), pipeline including the window produces
> expected
> > > > > result.
> > > > >
> > > > > Just checking first.. I can extract relevant pieces into a gist if
> > what
> > > > I'm
> > > > > trying is supposed to work.
> > > > >
> > > > > Thanks,
> > > > > Thomas
> > > > >
> > > >
> > >
> >
>


Re: New contributor

2017-09-13 Thread Griselda Cuevas
Welcome!

On Sep 13, 2017 4:43 PM, "Lukasz Cwik"  wrote:

> Welcome.
>
> On Wed, Sep 13, 2017 at 4:30 PM, Reuven Lax 
> wrote:
>
> > Welcome Daniel!
> >
> > Reuven
> >
> > On Wed, Sep 13, 2017 at 4:27 PM, Jesse Anderson <
> je...@bigdatainstitute.io
> > >
> > wrote:
> >
> > > Welcome!
> > >
> > > On Wed, Sep 13, 2017 at 2:24 PM Daniel Oliveira
> > >  wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > My name's Daniel Oliveira. I work at Google and I'd like to start
> > > > contributing to this project so I wanted to introduce myself.
> > > >
> > > > I've already read through the contribution guide and I'm excited to
> > start
> > > > making contributions soon!
> > > >
> > > > Thank you,
> > > > Daniel Oliveira
> > > >
> > > --
> > > Thanks,
> > >
> > > Jesse
> > >
> >
>


Re: New contributor

2017-09-13 Thread Lukasz Cwik
Welcome.

On Wed, Sep 13, 2017 at 4:30 PM, Reuven Lax 
wrote:

> Welcome Daniel!
>
> Reuven
>
> On Wed, Sep 13, 2017 at 4:27 PM, Jesse Anderson  >
> wrote:
>
> > Welcome!
> >
> > On Wed, Sep 13, 2017 at 2:24 PM Daniel Oliveira
> >  wrote:
> >
> > > Hi everyone,
> > >
> > > My name's Daniel Oliveira. I work at Google and I'd like to start
> > > contributing to this project so I wanted to introduce myself.
> > >
> > > I've already read through the contribution guide and I'm excited to
> start
> > > making contributions soon!
> > >
> > > Thank you,
> > > Daniel Oliveira
> > >
> > --
> > Thanks,
> >
> > Jesse
> >
>


Re: New contributor

2017-09-13 Thread Reuven Lax
Welcome Daniel!

Reuven

On Wed, Sep 13, 2017 at 4:27 PM, Jesse Anderson 
wrote:

> Welcome!
>
> On Wed, Sep 13, 2017 at 2:24 PM Daniel Oliveira
>  wrote:
>
> > Hi everyone,
> >
> > My name's Daniel Oliveira. I work at Google and I'd like to start
> > contributing to this project so I wanted to introduce myself.
> >
> > I've already read through the contribution guide and I'm excited to start
> > making contributions soon!
> >
> > Thank you,
> > Daniel Oliveira
> >
> --
> Thanks,
>
> Jesse
>


Re: New contributor

2017-09-13 Thread Jesse Anderson
Welcome!

On Wed, Sep 13, 2017 at 2:24 PM Daniel Oliveira
 wrote:

> Hi everyone,
>
> My name's Daniel Oliveira. I work at Google and I'd like to start
> contributing to this project so I wanted to introduce myself.
>
> I've already read through the contribution guide and I'm excited to start
> making contributions soon!
>
> Thank you,
> Daniel Oliveira
>
-- 
Thanks,

Jesse


Re: Using side inputs in any user code via thread-local side input accessor

2017-09-13 Thread Kenneth Knowles
ValueProvider is global, PCollectionView is per-window, state is
per-step/key/window, etc.

So my unhappiness increases as we move through that list, adding more and
more constraints on correct use, none of which are reflected in the API.
Your description of "its context is an execution of the pipeline" is
accurate for ValueProvider. The question is not merely "which DoFn will
need which side inputs" but in which methods the side input is accessed
(forbidden in every DoFn method other than @ProcessElement and @OnTimer).

As for lambdas being more universal - I agree! But the capabilities of
ParDo are not. I don't think we should transparently make them available
anywhere you have a lambda. For example, multiply triggered side inputs
fundamentally alter the semantics of MapElements and Filter to vary over
time. The only reason this isn't a showstopper is that multiply triggered
side inputs have very loose consistency already, and you can write
nondeterministic predicates and map functions anyhow. If either of those
were better, we'd want to keep them that way.

Since NewDoFn is somewhat tied to the alternative proposal, and there's the
point that since lambdas are cross-language we might reconsider
ProcessContext (aka "pile of mud") style. But this universality - being the
lowest common denominator across languages - is not a goal. Python already
is quite different from Java, using | and >> and kwarg side inputs to good
effect. And those two languages are quite similar. Go will look entirely
different. For Java, annotation-driven APIs are common and offer important
advantages for readability, validation, and forward/backward compatibility.
And incidentally NewDoFn subsumes ProcessContext.

On Wed, Sep 13, 2017 at 2:32 PM, Eugene Kirpichov <
kirpic...@google.com.invalid> wrote:

> Thanks!
>
> I think most of the issues you point out [validation, scheduling,
> prefetching] are in the area of wiring. I reiterate that they can be solved
> - both of the methods below will give the runner an answer to the low-level
> question "which DoFn will need which side inputs":
>
> 1) Providing withSideInputs() builder methods on transforms that are
> parameterized by user code. If only some side inputs should be made
> available to particular bits of user code, provide more detailed
> withBlahSideInputs() methods - this is up to the transform.
>
> 2) Inferring this from something annotation-driven as indicated in the
> thread, e.g. capturing the PCollectionView in @SideInput-annotated public
> fields. This can't be done on a lambda, because lambdas don't have fields
> [so I think method #1 will keep being necessary], but it can be done on an
> anonymous class.
>
> As for direct access being misleading: I'm not sure I agree. I think the
> intuition for PCollectionView.get() is no more wrong than the intuition for
> ValueProvider.get(): the return value is, logically, context-free [more
> like: its context is an execution of the pipeline], so I have no issue with
> it being accessed implicitly.
>
> On Wed, Sep 13, 2017 at 2:05 PM Kenneth Knowles 
> wrote:
>
> > I made some comments on https://issues.apache.org/jira/browse/BEAM-2950
> > which was filed to do a similar thing for State. Luke correctly pointed
> out
> > that many of the points apply here as well. I said most of the same
> above,
> > but I thought I'd pull them out again from that ticket and rephrase to
> > apply to side inputs:
> >
> >  - Direct access at first appears "more intuitive" because to a newcomer
> it
> > "looks like" normal [captured variable] access. But in fact it is nothing
> > like normal [captured variable] access so this intuition is misleading
> and
> > should not be encouraged. So it is actually less readable because your
> > intuitive reading is wrong.
> >
> >  - This design would miss the validation aspect. One way it is different
> > than normal [functional] programming is that there are many places it is
> > illegal to reference [side inputs], such as StartBundle/FinishBundle, or
> > passing to another object. This proposal would turn those into dynamic
> > failures at best, or in the worst case data corruption (runner fails to
> > catch illegal access, and permits some thread-global context to leak)
> >
> >  - It is actually mandatory that we are always able to detect [side
> inputs,
> > or the user has to manually wire them], as it [must be scheduled
> > differently]
> >
> >  - A runner can't automatically prefetch, because it doesn't know which
> > [side input] is used by which methods.
> >
> >  - Magic by mutating stuff into place is just less readable / more error
> > prone.
> >
> > State has even more compelling issues and none of the benefits so my
> +0.75
> > for side inputs (now I am feeling more like +0.25) is a -1 for state. We
> > should definitely not block one feature on all vaguely similar features.
> >
> > Kenn
> >
> >
> >
> > On Wed, Sep 13, 2017 at 1:56 PM, Eugene Kirpichov <
> > 

Re: Using side inputs in any user code via thread-local side input accessor

2017-09-13 Thread Eugene Kirpichov
Thanks!

I think most of the issues you point out [validation, scheduling,
prefetching] are in the area of wiring. I reiterate that they can be solved
- both of the methods below will give the runner an answer to the low-level
question "which DoFn will need which side inputs":

1) Providing withSideInputs() builder methods on transforms that are
parameterized by user code. If only some side inputs should be made
available to particular bits of user code, provide more detailed
withBlahSideInputs() methods - this is up to the transform.

2) Inferring this from something annotation-driven as indicated in the
thread, e.g. capturing the PCollectionView in @SideInput-annotated public
fields. This can't be done on a lambda, because lambdas don't have fields
[so I think method #1 will keep being necessary], but it can be done on an
anonymous class.

As for direct access being misleading: I'm not sure I agree. I think the
intuition for PCollectionView.get() is no more wrong than the intuition for
ValueProvider.get(): the return value is, logically, context-free [more
like: its context is an execution of the pipeline], so I have no issue with
it being accessed implicitly.

On Wed, Sep 13, 2017 at 2:05 PM Kenneth Knowles 
wrote:

> I made some comments on https://issues.apache.org/jira/browse/BEAM-2950
> which was filed to do a similar thing for State. Luke correctly pointed out
> that many of the points apply here as well. I said most of the same above,
> but I thought I'd pull them out again from that ticket and rephrase to
> apply to side inputs:
>
>  - Direct access at first appears "more intuitive" because to a newcomer it
> "looks like" normal [captured variable] access. But in fact it is nothing
> like normal [captured variable] access so this intuition is misleading and
> should not be encouraged. So it is actually less readable because your
> intuitive reading is wrong.
>
>  - This design would miss the validation aspect. One way it is different
> than normal [functional] programming is that there are many places it is
> illegal to reference [side inputs], such as StartBundle/FinishBundle, or
> passing to another object. This proposal would turn those into dynamic
> failures at best, or in the worst case data corruption (runner fails to
> catch illegal access, and permits some thread-global context to leak)
>
>  - It is actually mandatory that we are always able to detect [side inputs,
> or the user has to manually wire them], as it [must be scheduled
> differently]
>
>  - A runner can't automatically prefetch, because it doesn't know which
> [side input] is used by which methods.
>
>  - Magic by mutating stuff into place is just less readable / more error
> prone.
>
> State has even more compelling issues and none of the benefits so my +0.75
> for side inputs (now I am feeling more like +0.25) is a -1 for state. We
> should definitely not block one feature on all vaguely similar features.
>
> Kenn
>
>
>
> On Wed, Sep 13, 2017 at 1:56 PM, Eugene Kirpichov <
> kirpic...@google.com.invalid> wrote:
>
> > On Wed, Sep 13, 2017 at 1:44 PM Robert Bradshaw
> > 
> > wrote:
> >
> > > On Wed, Sep 13, 2017 at 1:17 PM, Eugene Kirpichov
> > >  wrote:
> > > > Hi Robert,
> > > >
> > > > Given the anticipated usage of this proposal in Java, I'm not sure
> the
> > > > Python approach you quoted is the right one.
> > >
> > > Perhaps not, but does that mean it would be a Java-ism only or would
> > > we implement it in Python despite it being worse there?
> > >
> > I'm not sure, but I don't see why the proposed approach of view.get()
> > wouldn't work well, or be harder to implement in Python.
> >
> >
> > >
> > > > The main reason: I see how it works with Map/FlatMap, but what about
> > > cases
> > > > like FileIO.write(), parameterized by several lambdas (element ->
> > > > destination, destination -> filename policy, destination -> sink),
> > where
> > > > different lambdas may want to access different side inputs? It feels
> > > > excessive to make each of the lambdas take all of the side inputs in
> > the
> > > > same order; moreover, if the composite transform internally needs to
> > pass
> > > > some more side inputs to the DoFn's executing these lambdas, it will
> > need
> > > > to manipulate the argument lists in nontrivial ways to make sure it
> > > passes
> > > > them only the side inputs the user asked for, and in the proper
> order.
> > >
> > > In Python it would be trivial to "slice" the side input arguments
> > > across the lambdas in a natural way, but I can see that this would be
> > > more of a pain in Java, especially as lambdas are unnecessarily
> > > crippled during compilation.
> > >
> > > > Another reason is, I think with Java's type system it's impossible to
> > > have
> > > > a NewDoFn-style API for lambdas, because annotations on lambda
> > arguments
> > > > are dropped when the lambda is converted to the respective
> > 

Re: Report to the Board, September 2017 edition

2017-09-13 Thread Davor Bonaci
Thanks everyone for a super quick turnaround! (The report is now submitted.)

On Wed, Sep 13, 2017 at 10:45 AM, Davor Bonaci  wrote:

> We are expected to submit a project report to the ASF Board of Directors
> ahead of its next meeting. The report is due on Wednesday, 9/13.
>
> If interested, please take a look at the draft [1], and comment or
> contribute content, as appropriate. I'll submit the report sometime in the
> next 24 hours.
>
> Thanks!
>
> Davor
>
> [1] https://docs.google.com/document/d/1uX8k99k2OXD6tizsJQ9KZtfxbO_I-
> 8rC_oOJgkcJ_O8/
>


New contributor

2017-09-13 Thread Daniel Oliveira
Hi everyone,

My name's Daniel Oliveira. I work at Google and I'd like to start
contributing to this project so I wanted to introduce myself.

I've already read through the contribution guide and I'm excited to start
making contributions soon!

Thank you,
Daniel Oliveira


Re: Using side inputs in any user code via thread-local side input accessor

2017-09-13 Thread Robert Bradshaw
On Wed, Sep 13, 2017 at 1:56 PM, Eugene Kirpichov
 wrote:
> On Wed, Sep 13, 2017 at 1:44 PM Robert Bradshaw 
> wrote:
>
>> On Wed, Sep 13, 2017 at 1:17 PM, Eugene Kirpichov
>>  wrote:
>> > Hi Robert,
>> >
>> > Given the anticipated usage of this proposal in Java, I'm not sure the
>> > Python approach you quoted is the right one.
>>
>> Perhaps not, but does that mean it would be a Java-ism only or would
>> we implement it in Python despite it being worse there?
>>
> I'm not sure, but I don't see why the proposed approach of view.get()
> wouldn't work well, or be harder to implement in Python.

Sure, one could implement this in Python, but why? It has few, if any,
advantages in Python, but does have several disadvantages to the way
side inputs are currently handled. The only reason to do so would be
because it works better for the Java api.

>> > The main reason: I see how it works with Map/FlatMap, but what about
>> cases
>> > like FileIO.write(), parameterized by several lambdas (element ->
>> > destination, destination -> filename policy, destination -> sink), where
>> > different lambdas may want to access different side inputs? It feels
>> > excessive to make each of the lambdas take all of the side inputs in the
>> > same order; moreover, if the composite transform internally needs to pass
>> > some more side inputs to the DoFn's executing these lambdas, it will need
>> > to manipulate the argument lists in nontrivial ways to make sure it
>> passes
>> > them only the side inputs the user asked for, and in the proper order.
>>
>> In Python it would be trivial to "slice" the side input arguments
>> across the lambdas in a natural way, but I can see that this would be
>> more of a pain in Java, especially as lambdas are unnecessarily
>> crippled during compilation.
>>
>> > Another reason is, I think with Java's type system it's impossible to
>> have
>> > a NewDoFn-style API for lambdas, because annotations on lambda arguments
>> > are dropped when the lambda is converted to the respective single-method
>> > interface - a lambda is subject to a lot more type erasure than anonymous
>> > class.
>>
>> Yeah, this is unfortunate. But, as mentioned, side inputs don't need
>> to be annotated, just counted. For something like inspecting the
>> window the NewDoFn has a lot of advantages over implicit access (and
>> makes it so you can't "forget" to declare your dependency), but I do
>> see advantages for the implicit way of doing things for delegating to
>> other callables.
>>
>> On the other hand, there is a bit of precedence for this: metrics have
>> the "implicit" api. If we do go this direction for side inputs, we
>> should also consider it for state and side outputs.
>>
> I think Kenn is very strongly against using it for state, whereas I don't
> have an opinion either way because I can't think of a use case for
> accessing state from a lambda - we should probably discuss this separately,
> with proposed code examples in front of us.
>
> For side outputs, yes, it might be nice to ".add()" to a PCollection, but
> it would require bigger changes - e.g. creating intermediate PCollection's
> and inserting an implicit Flatten in front of all steps that contribute to
> this PCollection, because a PCollection currently can be produced only by 1
> step. Maybe there's a different way to express implicit side outputs.
> Either way I support the idea of looking for such a way because it would
> simplify use cases such as error handling dead-letter collections.

I was imagining something more like a TupleTag.output(...) which would
affect that ParDo only rather than appending to an existing
PCollection from anywhere (which is a really big change, but could
also be useful for dead-letter stuff). Let's table that for now.

> I guess the bigger point is: do we want to block the discussion of implicit
> side inputs on making a decision about the implicitness of other things
> (side outputs, state, PipelineOptions, window etc). I can see the argument
> for a "yes, block", but can also see the argument for a "no, don't block" -
> because this proposal is (as indicated earlier in the thread)
> forward-compatible with annotation-based wiring, because we already have a
> precedent for implicit access of something via ValueProvider, and because
> of the advantages it offers.

Shouldn't block indefinitely, but I think it's quite relevant and
should be taken into consideration.

> Want to mention another advantage: lambdas are likely to be much easier
> than NewDoFn approach to use from non-Java but JVM languages/SDKs (e.g.
> Scio), which might have even more type erasure, or might have less
> sophisticated annotation machinery, or NewDoFn-style anonymous classes
> might be highly non-idiomatic in them. Lambdas are idiomatic in every
> language that supports lambdas, which these days is basically every
> language. [I might be opening a can of 

Re: Using side inputs in any user code via thread-local side input accessor

2017-09-13 Thread Kenneth Knowles
I made some comments on https://issues.apache.org/jira/browse/BEAM-2950
which was filed to do a similar thing for State. Luke correctly pointed out
that many of the points apply here as well. I said most of the same above,
but I thought I'd pull them out again from that ticket and rephrase to
apply to side inputs:

 - Direct access at first appears "more intuitive" because to a newcomer it
"looks like" normal [captured variable] access. But in fact it is nothing
like normal [captured variable] access so this intuition is misleading and
should not be encouraged. So it is actually less readable because your
intuitive reading is wrong.

 - This design would miss the validation aspect. One way it is different
than normal [functional] programming is that there are many places it is
illegal to reference [side inputs], such as StartBundle/FinishBundle, or
passing to another object. This proposal would turn those into dynamic
failures at best, or in the worst case data corruption (runner fails to
catch illegal access, and permits some thread-global context to leak)

 - It is actually mandatory that we are always able to detect [side inputs,
or the user has to manually wire them], as it [must be scheduled
differently]

 - A runner can't automatically prefetch, because it doesn't know which
[side input] is used by which methods.

 - Magic by mutating stuff into place is just less readable / more error
prone.

State has even more compelling issues and none of the benefits so my +0.75
for side inputs (now I am feeling more like +0.25) is a -1 for state. We
should definitely not block one feature on all vaguely similar features.

Kenn



On Wed, Sep 13, 2017 at 1:56 PM, Eugene Kirpichov <
kirpic...@google.com.invalid> wrote:

> On Wed, Sep 13, 2017 at 1:44 PM Robert Bradshaw
> 
> wrote:
>
> > On Wed, Sep 13, 2017 at 1:17 PM, Eugene Kirpichov
> >  wrote:
> > > Hi Robert,
> > >
> > > Given the anticipated usage of this proposal in Java, I'm not sure the
> > > Python approach you quoted is the right one.
> >
> > Perhaps not, but does that mean it would be a Java-ism only or would
> > we implement it in Python despite it being worse there?
> >
> I'm not sure, but I don't see why the proposed approach of view.get()
> wouldn't work well, or be harder to implement in Python.
>
>
> >
> > > The main reason: I see how it works with Map/FlatMap, but what about
> > cases
> > > like FileIO.write(), parameterized by several lambdas (element ->
> > > destination, destination -> filename policy, destination -> sink),
> where
> > > different lambdas may want to access different side inputs? It feels
> > > excessive to make each of the lambdas take all of the side inputs in
> the
> > > same order; moreover, if the composite transform internally needs to
> pass
> > > some more side inputs to the DoFn's executing these lambdas, it will
> need
> > > to manipulate the argument lists in nontrivial ways to make sure it
> > passes
> > > them only the side inputs the user asked for, and in the proper order.
> >
> > In Python it would be trivial to "slice" the side input arguments
> > across the lambdas in a natural way, but I can see that this would be
> > more of a pain in Java, especially as lambdas are unnecessarily
> > crippled during compilation.
> >
> > > Another reason is, I think with Java's type system it's impossible to
> > have
> > > a NewDoFn-style API for lambdas, because annotations on lambda
> arguments
> > > are dropped when the lambda is converted to the respective
> single-method
> > > interface - a lambda is subject to a lot more type erasure than
> anonymous
> > > class.
> >
> > Yeah, this is unfortunate. But, as mentioned, side inputs don't need
> > to be annotated, just counted. For something like inspecting the
> > window the NewDoFn has a lot of advantages over implicit access (and
> > makes it so you can't "forget" to declare your dependency), but I do
> > see advantages for the implicit way of doing things for delegating to
> > other callables.
> >
> > On the other hand, there is a bit of precedence for this: metrics have
> > the "implicit" api. If we do go this direction for side inputs, we
> > should also consider it for state and side outputs.
> >
> I think Kenn is very strongly against using it for state, whereas I don't
> have an opinion either way because I can't think of a use case for
> accessing state from a lambda - we should probably discuss this separately,
> with proposed code examples in front of us.
>
> For side outputs, yes, it might be nice to ".add()" to a PCollection, but
> it would require bigger changes - e.g. creating intermediate PCollection's
> and inserting an implicit Flatten in front of all steps that contribute to
> this PCollection, because a PCollection currently can be produced only by 1
> step. Maybe there's a different way to express implicit side outputs.
> Either way I support the idea of looking for such a way because it 

Re: Using side inputs in any user code via thread-local side input accessor

2017-09-13 Thread Eugene Kirpichov
On Wed, Sep 13, 2017 at 1:44 PM Robert Bradshaw 
wrote:

> On Wed, Sep 13, 2017 at 1:17 PM, Eugene Kirpichov
>  wrote:
> > Hi Robert,
> >
> > Given the anticipated usage of this proposal in Java, I'm not sure the
> > Python approach you quoted is the right one.
>
> Perhaps not, but does that mean it would be a Java-ism only or would
> we implement it in Python despite it being worse there?
>
I'm not sure, but I don't see why the proposed approach of view.get()
wouldn't work well, or be harder to implement in Python.


>
> > The main reason: I see how it works with Map/FlatMap, but what about
> cases
> > like FileIO.write(), parameterized by several lambdas (element ->
> > destination, destination -> filename policy, destination -> sink), where
> > different lambdas may want to access different side inputs? It feels
> > excessive to make each of the lambdas take all of the side inputs in the
> > same order; moreover, if the composite transform internally needs to pass
> > some more side inputs to the DoFn's executing these lambdas, it will need
> > to manipulate the argument lists in nontrivial ways to make sure it
> passes
> > them only the side inputs the user asked for, and in the proper order.
>
> In Python it would be trivial to "slice" the side input arguments
> across the lambdas in a natural way, but I can see that this would be
> more of a pain in Java, especially as lambdas are unnecessarily
> crippled during compilation.
>
> > Another reason is, I think with Java's type system it's impossible to
> have
> > a NewDoFn-style API for lambdas, because annotations on lambda arguments
> > are dropped when the lambda is converted to the respective single-method
> > interface - a lambda is subject to a lot more type erasure than anonymous
> > class.
>
> Yeah, this is unfortunate. But, as mentioned, side inputs don't need
> to be annotated, just counted. For something like inspecting the
> window the NewDoFn has a lot of advantages over implicit access (and
> makes it so you can't "forget" to declare your dependency), but I do
> see advantages for the implicit way of doing things for delegating to
> other callables.
>
> On the other hand, there is a bit of precedence for this: metrics have
> the "implicit" api. If we do go this direction for side inputs, we
> should also consider it for state and side outputs.
>
I think Kenn is very strongly against using it for state, whereas I don't
have an opinion either way because I can't think of a use case for
accessing state from a lambda - we should probably discuss this separately,
with proposed code examples in front of us.

For side outputs, yes, it might be nice to ".add()" to a PCollection, but
it would require bigger changes - e.g. creating intermediate PCollection's
and inserting an implicit Flatten in front of all steps that contribute to
this PCollection, because a PCollection currently can be produced only by 1
step. Maybe there's a different way to express implicit side outputs.
Either way I support the idea of looking for such a way because it would
simplify use cases such as error handling dead-letter collections.

I guess the bigger point is: do we want to block the discussion of implicit
side inputs on making a decision about the implicitness of other things
(side outputs, state, PipelineOptions, window etc). I can see the argument
for a "yes, block", but can also see the argument for a "no, don't block" -
because this proposal is (as indicated earlier in the thread)
forward-compatible with annotation-based wiring, because we already have a
precedent for implicit access of something via ValueProvider, and because
of the advantages it offers.

Want to mention another advantage: lambdas are likely to be much easier
than NewDoFn approach to use from non-Java but JVM languages/SDKs (e.g.
Scio), which might have even more type erasure, or might have less
sophisticated annotation machinery, or NewDoFn-style anonymous classes
might be highly non-idiomatic in them. Lambdas are idiomatic in every
language that supports lambdas, which these days is basically every
language. [I might be opening a can of worms here, but I guess you can
consider this an argument against NewDoFn in general - though that's
certainly outside the scope of this thread].


>
> > On Wed, Sep 13, 2017 at 1:03 PM Robert Bradshaw
> 
> > wrote:
> >
> >> +1 to reducing the amount of boilerplate for dealing with side inputs.
> >>
> >> I prefer the "NewDoFn" style of side inputs for consistency. The
> >> primary drawback seems to be lambda's incompatibility with
> >> annotations. This is solved in Python by letting all the first
> >> annotated argument of the process method be the main input, and
> >> subsequent ones be the side input. For example
> >>
> >> main_pcoll | beam.Map(
> >> lambda main_input_elem, side_input_value: main_input_elem +
> >> side_input_value,
> >> side_input_pvalue)
> >>
> 

Re: Using side inputs in any user code via thread-local side input accessor

2017-09-13 Thread Robert Bradshaw
On Wed, Sep 13, 2017 at 1:17 PM, Eugene Kirpichov
 wrote:
> Hi Robert,
>
> Given the anticipated usage of this proposal in Java, I'm not sure the
> Python approach you quoted is the right one.

Perhaps not, but does that mean it would be a Java-ism only or would
we implement it in Python despite it being worse there?

> The main reason: I see how it works with Map/FlatMap, but what about cases
> like FileIO.write(), parameterized by several lambdas (element ->
> destination, destination -> filename policy, destination -> sink), where
> different lambdas may want to access different side inputs? It feels
> excessive to make each of the lambdas take all of the side inputs in the
> same order; moreover, if the composite transform internally needs to pass
> some more side inputs to the DoFn's executing these lambdas, it will need
> to manipulate the argument lists in nontrivial ways to make sure it passes
> them only the side inputs the user asked for, and in the proper order.

In Python it would be trivial to "slice" the side input arguments
across the lambdas in a natural way, but I can see that this would be
more of a pain in Java, especially as lambdas are unnecessarily
crippled during compilation.

> Another reason is, I think with Java's type system it's impossible to have
> a NewDoFn-style API for lambdas, because annotations on lambda arguments
> are dropped when the lambda is converted to the respective single-method
> interface - a lambda is subject to a lot more type erasure than anonymous
> class.

Yeah, this is unfortunate. But, as mentioned, side inputs don't need
to be annotated, just counted. For something like inspecting the
window the NewDoFn has a lot of advantages over implicit access (and
makes it so you can't "forget" to declare your dependency), but I do
see advantages for the implicit way of doing things for delegating to
other callables.

On the other hand, there is a bit of precedence for this: metrics have
the "implicit" api. If we do go this direction for side inputs, we
should also consider it for state and side outputs.

> On Wed, Sep 13, 2017 at 1:03 PM Robert Bradshaw 
> wrote:
>
>> +1 to reducing the amount of boilerplate for dealing with side inputs.
>>
>> I prefer the "NewDoFn" style of side inputs for consistency. The
>> primary drawback seems to be lambda's incompatibility with
>> annotations. This is solved in Python by letting all the first
>> annotated argument of the process method be the main input, and
>> subsequent ones be the side input. For example
>>
>> main_pcoll | beam.Map(
>> lambda main_input_elem, side_input_value: main_input_elem +
>> side_input_value,
>> side_input_pvalue)
>>
>> For multiple side inputs they are mapped positionally (though Python
>> has the advantage that arguments can be passed by keyword as well to
>> enhance readability when there are many of them, and we allow that for
>> side inputs). Note that side_input_pvalue is not referenced anywhere
>> else, so we don't even have to store it and pass it around (one
>> typically writes pvalue.AsList(some_pcoll) inline here). When the
>> concrete PCollectionView is used to access the value this means that
>> it must be passed separately to both the ParDo and the callback
>> (unless we can infer it, which I don't think we can do in all (many?)
>> cases).
>>
>> There's no reason we couldn't do this, or something very similar, in
>> Java as well.
>>
>> On Wed, Sep 13, 2017 at 10:55 AM, Reuven Lax 
>> wrote:
>> > On Wed, Sep 13, 2017 at 10:05 AM, Eugene Kirpichov <
>> > kirpic...@google.com.invalid> wrote:
>> >
>> >> Hi,
>> >>
>> >> I agree with these concerns to an extent, however I think the advantage
>> of
>> >> transparently letting any user code access side inputs, especially
>> >> including lambdas, is so great that we should find a way to address
>> these
>> >> concerns within the constraints of the pattern I'm proposing. See more
>> >> below.
>> >>
>> >> On Wed, Sep 13, 2017 at 9:29 AM Ben Chambers
>> > >> >
>> >> wrote:
>> >>
>> >> > One possible issue with this is that updating a thread local is
>> likely to
>> >> > be much more expensive than passing an additional argument.
>> >>
>> >> This is an implementation detail that can be fixed - Luke made a
>> suggestion
>> >> on the PR to set up the side input context once per bundle rather than
>> once
>> >> per element.
>> >>
>> >
>> > However remember that bundles might be small. Dataflow streaming runner
>> > creates small bundles by design. The Flink runner creates single-element
>> > bundles.
>> >
>> >
>> >>
>> >>
>> >> > Also, not all
>> >> > code called from within the DoFn will necessarily be in the same
>> thread
>> >> > (eg., sometimes we create a pool of threads for doing work).
>> >>
>> >> I think we already require that c.output() can not be done from multiple
>> >> threads; and I don't think we document 

Re: Using side inputs in any user code via thread-local side input accessor

2017-09-13 Thread Eugene Kirpichov
Hi Robert,

Given the anticipated usage of this proposal in Java, I'm not sure the
Python approach you quoted is the right one.

The main reason: I see how it works with Map/FlatMap, but what about cases
like FileIO.write(), parameterized by several lambdas (element ->
destination, destination -> filename policy, destination -> sink), where
different lambdas may want to access different side inputs? It feels
excessive to make each of the lambdas take all of the side inputs in the
same order; moreover, if the composite transform internally needs to pass
some more side inputs to the DoFn's executing these lambdas, it will need
to manipulate the argument lists in nontrivial ways to make sure it passes
them only the side inputs the user asked for, and in the proper order.

Another reason is, I think with Java's type system it's impossible to have
a NewDoFn-style API for lambdas, because annotations on lambda arguments
are dropped when the lambda is converted to the respective single-method
interface - a lambda is subject to a lot more type erasure than anonymous
class.

On Wed, Sep 13, 2017 at 1:03 PM Robert Bradshaw 
wrote:

> +1 to reducing the amount of boilerplate for dealing with side inputs.
>
> I prefer the "NewDoFn" style of side inputs for consistency. The
> primary drawback seems to be lambda's incompatibility with
> annotations. This is solved in Python by letting all the first
> annotated argument of the process method be the main input, and
> subsequent ones be the side input. For example
>
> main_pcoll | beam.Map(
> lambda main_input_elem, side_input_value: main_input_elem +
> side_input_value,
> side_input_pvalue)
>
> For multiple side inputs they are mapped positionally (though Python
> has the advantage that arguments can be passed by keyword as well to
> enhance readability when there are many of them, and we allow that for
> side inputs). Note that side_input_pvalue is not referenced anywhere
> else, so we don't even have to store it and pass it around (one
> typically writes pvalue.AsList(some_pcoll) inline here). When the
> concrete PCollectionView is used to access the value this means that
> it must be passed separately to both the ParDo and the callback
> (unless we can infer it, which I don't think we can do in all (many?)
> cases).
>
> There's no reason we couldn't do this, or something very similar, in
> Java as well.
>
> On Wed, Sep 13, 2017 at 10:55 AM, Reuven Lax 
> wrote:
> > On Wed, Sep 13, 2017 at 10:05 AM, Eugene Kirpichov <
> > kirpic...@google.com.invalid> wrote:
> >
> >> Hi,
> >>
> >> I agree with these concerns to an extent, however I think the advantage
> of
> >> transparently letting any user code access side inputs, especially
> >> including lambdas, is so great that we should find a way to address
> these
> >> concerns within the constraints of the pattern I'm proposing. See more
> >> below.
> >>
> >> On Wed, Sep 13, 2017 at 9:29 AM Ben Chambers
>  >> >
> >> wrote:
> >>
> >> > One possible issue with this is that updating a thread local is
> likely to
> >> > be much more expensive than passing an additional argument.
> >>
> >> This is an implementation detail that can be fixed - Luke made a
> suggestion
> >> on the PR to set up the side input context once per bundle rather than
> once
> >> per element.
> >>
> >
> > However remember that bundles might be small. Dataflow streaming runner
> > creates small bundles by design. The Flink runner creates single-element
> > bundles.
> >
> >
> >>
> >>
> >> > Also, not all
> >> > code called from within the DoFn will necessarily be in the same
> thread
> >> > (eg., sometimes we create a pool of threads for doing work).
> >>
> >> I think we already require that c.output() can not be done from multiple
> >> threads; and I don't think we document c.sideInput() to be thread-safe
> - it
> >> may be reasonable to declare that it isn't and has to be accessed from
> the
> >> same thread as the ProcessElement call. If we want to relax this, then
> >> there might be ways to deal with that too, e.g. provide utilities for
> the
> >> user to capture the "user code context" and restoring it inside a
> thread.
> >> This would likely be valuable for other purposes, such as making those
> >> extra threads visible to our profiling utilities.
> >>
> >
> > This seems fair, but we should be be very careful about our
> documentation.
> > And +1 to adding utilities to make multi-threaded work easier to manage.
> >
> >>
> >>
> >> > It may be
> >> > *more* confusing for this to sometimes work magically and sometimes
> fail
> >> > horribly. Also, requiring the PCollectionView to be passed to user
> code
> >> > that accesses it is nice because it makes *very clear* that the side
> >> input
> >> > needs to be provided from the DoFn to that particular utility. If it
> is
> >> > accessed via "spooky action at a distance" we lose that piece of
> "free"
> >> > 

Re: BigQueryIO withSchemaFromView

2017-09-13 Thread Chaim Turkel
just found, that with bigquery you cannot stream to partitions older
than 30 days (so i can't use it anyway to load old data) :(

On Wed, Sep 13, 2017 at 7:08 PM, Lukasz Cwik  wrote:
> Support was added to expose how users want to load their data with
> https://github.com/apache/beam/commit/075d4d45a9cd398f3b4023b6efd495cc58eb9bdd
> It is planned to be released in 2.2.0
>
> On Tue, Sep 12, 2017 at 11:48 PM, Chaim Turkel  wrote:
>
>> from what i found it I have the windowing with bigquery partition (per
>> day - 1545 partitions) the insert can take 5 hours, where if there is
>> no partitions then it takes about 12 minutes
>>
>> I have 13,843,080 recrods 6.76 GB.
>> Any ideas how to get the partition to work faster.
>>
>> Is there a way to get the BigQueryIO to use streaming and not jobs?
>>
>> chaim
>>
>> On Tue, Sep 12, 2017 at 11:32 PM, Chaim Turkel  wrote:
>> > i am using windowing for the partion of the table, maybe that has to do
>> with it?
>> >
>> > On Tue, Sep 12, 2017 at 11:25 PM, Reuven Lax 
>> wrote:
>> >> Ok, something is going wrong then. It appears that your job created over
>> >> 14,000 BigQuery load jobs, which is not expected (and probably why
>> things
>> >> were so slow).
>> >>
>> >> On Tue, Sep 12, 2017 at 8:50 AM, Chaim Turkel  wrote:
>> >>
>> >>> no that specific job created only 2 tables
>> >>>
>> >>> On Tue, Sep 12, 2017 at 4:36 PM, Reuven Lax 
>> >>> wrote:
>> >>> > It looks like your job is creating about 14,45 distinct BigQuery
>> tables.
>> >>> > Does that sound correct to you?
>> >>> >
>> >>> > Reuven
>> >>> >
>> >>> > On Tue, Sep 12, 2017 at 6:22 AM, Chaim Turkel 
>> wrote:
>> >>> >
>> >>> >> the job id is 2017-09-12_02_57_55-5233544151932101752
>> >>> >> as you can see the majority of the time is inserting into bigquery.
>> >>> >> is there any way to parallel this?
>> >>> >>
>> >>> >> My feeling for the windowing is that writing should be done per
>> window
>> >>> >> (my window is daily) or at least to be able to configure it
>> >>> >>
>> >>> >> chaim
>> >>> >>
>> >>> >> On Tue, Sep 12, 2017 at 4:10 PM, Reuven Lax
>> 
>> >>> >> wrote:
>> >>> >> > So the problem is you are running on Dataflow, and it's taking
>> longer
>> >>> >> than
>> >>> >> > you think it should? If you provide the Dataflow job id we can
>> help
>> >>> you
>> >>> >> > debug why it's taking 30 minutes. (and as an aside, if this turns
>> >>> into a
>> >>> >> > Dataflow debugging session we should move it off of the Beam list
>> and
>> >>> >> onto
>> >>> >> > a Dataflow-specific tread)
>> >>> >> >
>> >>> >> > Reuven
>> >>> >> >
>> >>> >> > On Tue, Sep 12, 2017 at 3:28 AM, Chaim Turkel 
>> >>> wrote:
>> >>> >> >
>> >>> >> >> is there a way around this, my time for 13gb is not close to 30
>> >>> >> >> minutes, while it should be around 15 minutes.
>> >>> >> >> Do i need to chunk the code myself to windows, and run in
>> parallel?
>> >>> >> >> chaim
>> >>> >> >>
>> >>> >> >> On Sun, Sep 10, 2017 at 6:32 PM, Reuven Lax
>> > >>> >
>> >>> >> >> wrote:
>> >>> >> >> > In that case I can say unequivocally that Dataflow (in batch
>> mode)
>> >>> >> does
>> >>> >> >> not
>> >>> >> >> > produce results for a stage until it has processed that entire
>> >>> stage.
>> >>> >> The
>> >>> >> >> > reason for this is that the batch runner is optimized for
>> >>> throughput,
>> >>> >> not
>> >>> >> >> > latency; it wants to minimize the time for the entire job to
>> >>> finish,
>> >>> >> not
>> >>> >> >> > the time till first output. The side input will not be
>> materialized
>> >>> >> until
>> >>> >> >> > all of the data for all of the windows of the side input have
>> been
>> >>> >> >> > processed. The streaming runner on the other hand will produce
>> >>> >> windows as
>> >>> >> >> > they finish. So for the batch runner, there is no performance
>> >>> >> advantage
>> >>> >> >> you
>> >>> >> >> > get for windowing the side input.
>> >>> >> >> >
>> >>> >> >> > The fact that BigQueryIO needs the schema side input to be
>> globally
>> >>> >> >> > windowed is a bit confusing and not well documented. We should
>> add
>> >>> >> better
>> >>> >> >> > javadoc explaining this.
>> >>> >> >> >
>> >>> >> >> > Reuven
>> >>> >> >> >
>> >>> >> >> > On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel <
>> ch...@behalf.com>
>> >>> >> wrote:
>> >>> >> >> >
>> >>> >> >> >> batch on dataflow
>> >>> >> >> >>
>> >>> >> >> >> On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax
>> >>> > >>> >> >
>> >>> >> >> >> wrote:
>> >>> >> >> >> > Which runner are you using? And is this a batch pipeline?
>> >>> >> >> >> >
>> >>> >> >> >> > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <
>> ch...@behalf.com
>> >>> >
>> >>> >> >> wrote:
>> >>> >> >> >> >
>> >>> >> >> >> >> Thank for the answer, but i don't think that that is the
>> 

Re: BigQueryIO withSchemaFromView

2017-09-13 Thread Chaim Turkel
.apply("insert data table - " + table.getTableName(),
BigQueryIO.writeTableRows()
.to(TableRefPartition.perDay(options.getBQProject(),
options.getDatasetId(), table.getBqTableName()))
.withSchemaFromView(tableSchemas)

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)


public class TableRefPartition implements
SerializableFunction
{

private final String projectId;
private final String datasetId;
private final String pattern;
private final String table;

public static TableRefPartition perDay(String projectId, String
datasetId, String tablePrefix) {
return new TableRefPartition(projectId, datasetId, "MMdd",
tablePrefix + "$");
}

private TableRefPartition(String projectId, String datasetId,
String pattern, String table) {
this.projectId = projectId;
this.datasetId = datasetId;
this.pattern = pattern;
this.table = table;
}

@Override
public TableDestination apply(ValueInSingleWindow input) {
DateTimeFormatter partition =
DateTimeFormat.forPattern(pattern).withZoneUTC();

TableReference reference = new TableReference();
reference.setProjectId(this.projectId);
reference.setDatasetId(this.datasetId);

reference.setTableId(table +
input.getWindow().maxTimestamp().toString(partition));
return new TableDestination(reference, null);
}
}

On Wed, Sep 13, 2017 at 9:40 PM, Reuven Lax  wrote:
> Can you show us some of the code you are using? How are you loading into
> separate partitions?
>
> Reuven
>
> On Wed, Sep 13, 2017 at 10:13 AM, Chaim Turkel  wrote:
>
>>  I am loading into separate partitions of the same table.
>> I want to see it streaming will be faster.
>>
>> Is there a repository where i can use the snapshot version?
>>
>>
>> On Wed, Sep 13, 2017 at 7:19 PM, Reuven Lax 
>> wrote:
>> > Ah, so you are loading each window into a separate BigQuery table? That
>> > might be the reason things are slow. Remembert a batch job doesn't return
>> > until everything finishes, and if you are loading that many tables it's
>> > entirely possible that BigQuery will throttle you, causing the slowdown.
>> >
>> > A couple of options:
>> >
>> > 1. Instead of loading into separate BigQuery tables, you could load into
>> > separate partitions of the same table. See this page for more info:
>> > https://cloud.google.com/bigquery/docs/partitioned-tables
>> >
>> > 2. If you have a streaming unbounded source for your data, you can run
>> > using a streaming runner. That will load each window as it becomes
>> > available instead of waiting for everything to load.
>> >
>> > Reuven
>> >
>> > On Tue, Sep 12, 2017 at 11:48 PM, Chaim Turkel  wrote:
>> >
>> >> from what i found it I have the windowing with bigquery partition (per
>> >> day - 1545 partitions) the insert can take 5 hours, where if there is
>> >> no partitions then it takes about 12 minutes
>> >>
>> >> I have 13,843,080 recrods 6.76 GB.
>> >> Any ideas how to get the partition to work faster.
>> >>
>> >> Is there a way to get the BigQueryIO to use streaming and not jobs?
>> >>
>> >> chaim
>> >>
>> >> On Tue, Sep 12, 2017 at 11:32 PM, Chaim Turkel 
>> wrote:
>> >> > i am using windowing for the partion of the table, maybe that has to
>> do
>> >> with it?
>> >> >
>> >> > On Tue, Sep 12, 2017 at 11:25 PM, Reuven Lax > >
>> >> wrote:
>> >> >> Ok, something is going wrong then. It appears that your job created
>> over
>> >> >> 14,000 BigQuery load jobs, which is not expected (and probably why
>> >> things
>> >> >> were so slow).
>> >> >>
>> >> >> On Tue, Sep 12, 2017 at 8:50 AM, Chaim Turkel 
>> wrote:
>> >> >>
>> >> >>> no that specific job created only 2 tables
>> >> >>>
>> >> >>> On Tue, Sep 12, 2017 at 4:36 PM, Reuven Lax
>> 
>> >> >>> wrote:
>> >> >>> > It looks like your job is creating about 14,45 distinct BigQuery
>> >> tables.
>> >> >>> > Does that sound correct to you?
>> >> >>> >
>> >> >>> > Reuven
>> >> >>> >
>> >> >>> > On Tue, Sep 12, 2017 at 6:22 AM, Chaim Turkel 
>> >> wrote:
>> >> >>> >
>> >> >>> >> the job id is 2017-09-12_02_57_55-5233544151932101752
>> >> >>> >> as you can see the majority of the time is inserting into
>> bigquery.
>> >> >>> >> is there any way to parallel this?
>> >> >>> >>
>> >> >>> >> My feeling for the windowing is that writing should be done per
>> >> window
>> >> >>> >> (my window is daily) or at least to be able to configure it
>> >> >>> >>
>> >> >>> >> chaim
>> >> >>> >>
>> >> >>> >> On Tue, Sep 12, 2017 at 4:10 PM, Reuven Lax
>> >> 
>> >> >>> >> wrote:
>> >> >>> >> > So the problem is you are running on Dataflow, and it's taking
>> >> longer
>> >> >>> >> than
>> >> >>> >> > you 

Re: Using side inputs in any user code via thread-local side input accessor

2017-09-13 Thread Robert Bradshaw
+1 to reducing the amount of boilerplate for dealing with side inputs.

I prefer the "NewDoFn" style of side inputs for consistency. The
primary drawback seems to be lambda's incompatibility with
annotations. This is solved in Python by letting all the first
annotated argument of the process method be the main input, and
subsequent ones be the side input. For example

main_pcoll | beam.Map(
lambda main_input_elem, side_input_value: main_input_elem +
side_input_value,
side_input_pvalue)

For multiple side inputs they are mapped positionally (though Python
has the advantage that arguments can be passed by keyword as well to
enhance readability when there are many of them, and we allow that for
side inputs). Note that side_input_pvalue is not referenced anywhere
else, so we don't even have to store it and pass it around (one
typically writes pvalue.AsList(some_pcoll) inline here). When the
concrete PCollectionView is used to access the value this means that
it must be passed separately to both the ParDo and the callback
(unless we can infer it, which I don't think we can do in all (many?)
cases).

There's no reason we couldn't do this, or something very similar, in
Java as well.

On Wed, Sep 13, 2017 at 10:55 AM, Reuven Lax  wrote:
> On Wed, Sep 13, 2017 at 10:05 AM, Eugene Kirpichov <
> kirpic...@google.com.invalid> wrote:
>
>> Hi,
>>
>> I agree with these concerns to an extent, however I think the advantage of
>> transparently letting any user code access side inputs, especially
>> including lambdas, is so great that we should find a way to address these
>> concerns within the constraints of the pattern I'm proposing. See more
>> below.
>>
>> On Wed, Sep 13, 2017 at 9:29 AM Ben Chambers > >
>> wrote:
>>
>> > One possible issue with this is that updating a thread local is likely to
>> > be much more expensive than passing an additional argument.
>>
>> This is an implementation detail that can be fixed - Luke made a suggestion
>> on the PR to set up the side input context once per bundle rather than once
>> per element.
>>
>
> However remember that bundles might be small. Dataflow streaming runner
> creates small bundles by design. The Flink runner creates single-element
> bundles.
>
>
>>
>>
>> > Also, not all
>> > code called from within the DoFn will necessarily be in the same thread
>> > (eg., sometimes we create a pool of threads for doing work).
>>
>> I think we already require that c.output() can not be done from multiple
>> threads; and I don't think we document c.sideInput() to be thread-safe - it
>> may be reasonable to declare that it isn't and has to be accessed from the
>> same thread as the ProcessElement call. If we want to relax this, then
>> there might be ways to deal with that too, e.g. provide utilities for the
>> user to capture the "user code context" and restoring it inside a thread.
>> This would likely be valuable for other purposes, such as making those
>> extra threads visible to our profiling utilities.
>>
>
> This seems fair, but we should be be very careful about our documentation.
> And +1 to adding utilities to make multi-threaded work easier to manage.
>
>>
>>
>> > It may be
>> > *more* confusing for this to sometimes work magically and sometimes fail
>> > horribly. Also, requiring the PCollectionView to be passed to user code
>> > that accesses it is nice because it makes *very clear* that the side
>> input
>> > needs to be provided from the DoFn to that particular utility. If it is
>> > accessed via "spooky action at a distance" we lose that piece of "free"
>> > documentation, which may lead to extensive misuse of these utility
>> methods.
>> >
>> I'd like to understand this concern better - from this description it's not
>> clear to me. The pattern I'm proposing is that, when you're authoring a
>> PTransform that is configured by any user callbacks, then:
>> - you should provide a builder method .withSideInputs(...)
>> - you should propagate those side inputs to all your internal DoFn's that
>> invoke the user code
>> - in return the user callbacks will be allowed to access those particular
>> side inputs
>> This seems like a simple enough model to me to understand, both from a
>> user's perspective and from a transform author's perspective. Steps 1 and 2
>> may eventually be automated by annotation analysis or other means (e.g. SDK
>> giving a way to provide given side inputs automatically to everything
>> inside a composite transform rather than to individual DoFn's).
>>
>>
>> >
>> > On Wed, Sep 6, 2017 at 11:10 AM Eugene Kirpichov
>> >  wrote:
>> >
>> > > Hi,
>> > >
>> > > On Wed, Sep 6, 2017 at 10:55 AM Kenneth Knowles > >
>> > > wrote:
>> > >
>> > > > On Wed, Sep 6, 2017 at 8:15 AM, Eugene Kirpichov <
>> > > > kirpic...@google.com.invalid> wrote:
>> > > >
>> > > > >
>> > > > > The differences are:
>> > > > > - The proposal in the doc allows wiring 

Re: BigQueryIO withSchemaFromView

2017-09-13 Thread Reuven Lax
Can you show us some of the code you are using? How are you loading into
separate partitions?

Reuven

On Wed, Sep 13, 2017 at 10:13 AM, Chaim Turkel  wrote:

>  I am loading into separate partitions of the same table.
> I want to see it streaming will be faster.
>
> Is there a repository where i can use the snapshot version?
>
>
> On Wed, Sep 13, 2017 at 7:19 PM, Reuven Lax 
> wrote:
> > Ah, so you are loading each window into a separate BigQuery table? That
> > might be the reason things are slow. Remembert a batch job doesn't return
> > until everything finishes, and if you are loading that many tables it's
> > entirely possible that BigQuery will throttle you, causing the slowdown.
> >
> > A couple of options:
> >
> > 1. Instead of loading into separate BigQuery tables, you could load into
> > separate partitions of the same table. See this page for more info:
> > https://cloud.google.com/bigquery/docs/partitioned-tables
> >
> > 2. If you have a streaming unbounded source for your data, you can run
> > using a streaming runner. That will load each window as it becomes
> > available instead of waiting for everything to load.
> >
> > Reuven
> >
> > On Tue, Sep 12, 2017 at 11:48 PM, Chaim Turkel  wrote:
> >
> >> from what i found it I have the windowing with bigquery partition (per
> >> day - 1545 partitions) the insert can take 5 hours, where if there is
> >> no partitions then it takes about 12 minutes
> >>
> >> I have 13,843,080 recrods 6.76 GB.
> >> Any ideas how to get the partition to work faster.
> >>
> >> Is there a way to get the BigQueryIO to use streaming and not jobs?
> >>
> >> chaim
> >>
> >> On Tue, Sep 12, 2017 at 11:32 PM, Chaim Turkel 
> wrote:
> >> > i am using windowing for the partion of the table, maybe that has to
> do
> >> with it?
> >> >
> >> > On Tue, Sep 12, 2017 at 11:25 PM, Reuven Lax  >
> >> wrote:
> >> >> Ok, something is going wrong then. It appears that your job created
> over
> >> >> 14,000 BigQuery load jobs, which is not expected (and probably why
> >> things
> >> >> were so slow).
> >> >>
> >> >> On Tue, Sep 12, 2017 at 8:50 AM, Chaim Turkel 
> wrote:
> >> >>
> >> >>> no that specific job created only 2 tables
> >> >>>
> >> >>> On Tue, Sep 12, 2017 at 4:36 PM, Reuven Lax
> 
> >> >>> wrote:
> >> >>> > It looks like your job is creating about 14,45 distinct BigQuery
> >> tables.
> >> >>> > Does that sound correct to you?
> >> >>> >
> >> >>> > Reuven
> >> >>> >
> >> >>> > On Tue, Sep 12, 2017 at 6:22 AM, Chaim Turkel 
> >> wrote:
> >> >>> >
> >> >>> >> the job id is 2017-09-12_02_57_55-5233544151932101752
> >> >>> >> as you can see the majority of the time is inserting into
> bigquery.
> >> >>> >> is there any way to parallel this?
> >> >>> >>
> >> >>> >> My feeling for the windowing is that writing should be done per
> >> window
> >> >>> >> (my window is daily) or at least to be able to configure it
> >> >>> >>
> >> >>> >> chaim
> >> >>> >>
> >> >>> >> On Tue, Sep 12, 2017 at 4:10 PM, Reuven Lax
> >> 
> >> >>> >> wrote:
> >> >>> >> > So the problem is you are running on Dataflow, and it's taking
> >> longer
> >> >>> >> than
> >> >>> >> > you think it should? If you provide the Dataflow job id we can
> >> help
> >> >>> you
> >> >>> >> > debug why it's taking 30 minutes. (and as an aside, if this
> turns
> >> >>> into a
> >> >>> >> > Dataflow debugging session we should move it off of the Beam
> list
> >> and
> >> >>> >> onto
> >> >>> >> > a Dataflow-specific tread)
> >> >>> >> >
> >> >>> >> > Reuven
> >> >>> >> >
> >> >>> >> > On Tue, Sep 12, 2017 at 3:28 AM, Chaim Turkel <
> ch...@behalf.com>
> >> >>> wrote:
> >> >>> >> >
> >> >>> >> >> is there a way around this, my time for 13gb is not close to
> 30
> >> >>> >> >> minutes, while it should be around 15 minutes.
> >> >>> >> >> Do i need to chunk the code myself to windows, and run in
> >> parallel?
> >> >>> >> >> chaim
> >> >>> >> >>
> >> >>> >> >> On Sun, Sep 10, 2017 at 6:32 PM, Reuven Lax
> >>  >> >>> >
> >> >>> >> >> wrote:
> >> >>> >> >> > In that case I can say unequivocally that Dataflow (in batch
> >> mode)
> >> >>> >> does
> >> >>> >> >> not
> >> >>> >> >> > produce results for a stage until it has processed that
> entire
> >> >>> stage.
> >> >>> >> The
> >> >>> >> >> > reason for this is that the batch runner is optimized for
> >> >>> throughput,
> >> >>> >> not
> >> >>> >> >> > latency; it wants to minimize the time for the entire job to
> >> >>> finish,
> >> >>> >> not
> >> >>> >> >> > the time till first output. The side input will not be
> >> materialized
> >> >>> >> until
> >> >>> >> >> > all of the data for all of the windows of the side input
> have
> >> been
> >> >>> >> >> > processed. The streaming runner on the other hand will
> produce
> >> >>> >> windows as
> >> >>> >> >> > they 

Re: Using side inputs in any user code via thread-local side input accessor

2017-09-13 Thread Reuven Lax
On Wed, Sep 13, 2017 at 10:05 AM, Eugene Kirpichov <
kirpic...@google.com.invalid> wrote:

> Hi,
>
> I agree with these concerns to an extent, however I think the advantage of
> transparently letting any user code access side inputs, especially
> including lambdas, is so great that we should find a way to address these
> concerns within the constraints of the pattern I'm proposing. See more
> below.
>
> On Wed, Sep 13, 2017 at 9:29 AM Ben Chambers  >
> wrote:
>
> > One possible issue with this is that updating a thread local is likely to
> > be much more expensive than passing an additional argument.
>
> This is an implementation detail that can be fixed - Luke made a suggestion
> on the PR to set up the side input context once per bundle rather than once
> per element.
>

However remember that bundles might be small. Dataflow streaming runner
creates small bundles by design. The Flink runner creates single-element
bundles.


>
>
> > Also, not all
> > code called from within the DoFn will necessarily be in the same thread
> > (eg., sometimes we create a pool of threads for doing work).
>
> I think we already require that c.output() can not be done from multiple
> threads; and I don't think we document c.sideInput() to be thread-safe - it
> may be reasonable to declare that it isn't and has to be accessed from the
> same thread as the ProcessElement call. If we want to relax this, then
> there might be ways to deal with that too, e.g. provide utilities for the
> user to capture the "user code context" and restoring it inside a thread.
> This would likely be valuable for other purposes, such as making those
> extra threads visible to our profiling utilities.
>

This seems fair, but we should be be very careful about our documentation.
And +1 to adding utilities to make multi-threaded work easier to manage.

>
>
> > It may be
> > *more* confusing for this to sometimes work magically and sometimes fail
> > horribly. Also, requiring the PCollectionView to be passed to user code
> > that accesses it is nice because it makes *very clear* that the side
> input
> > needs to be provided from the DoFn to that particular utility. If it is
> > accessed via "spooky action at a distance" we lose that piece of "free"
> > documentation, which may lead to extensive misuse of these utility
> methods.
> >
> I'd like to understand this concern better - from this description it's not
> clear to me. The pattern I'm proposing is that, when you're authoring a
> PTransform that is configured by any user callbacks, then:
> - you should provide a builder method .withSideInputs(...)
> - you should propagate those side inputs to all your internal DoFn's that
> invoke the user code
> - in return the user callbacks will be allowed to access those particular
> side inputs
> This seems like a simple enough model to me to understand, both from a
> user's perspective and from a transform author's perspective. Steps 1 and 2
> may eventually be automated by annotation analysis or other means (e.g. SDK
> giving a way to provide given side inputs automatically to everything
> inside a composite transform rather than to individual DoFn's).
>
>
> >
> > On Wed, Sep 6, 2017 at 11:10 AM Eugene Kirpichov
> >  wrote:
> >
> > > Hi,
> > >
> > > On Wed, Sep 6, 2017 at 10:55 AM Kenneth Knowles  >
> > > wrote:
> > >
> > > > On Wed, Sep 6, 2017 at 8:15 AM, Eugene Kirpichov <
> > > > kirpic...@google.com.invalid> wrote:
> > > >
> > > > >
> > > > > The differences are:
> > > > > - The proposal in the doc allows wiring different side inputs to
> the
> > > same
> > > > > Supplier, but I'm not convinced that this is important - you can
> just
> > > as
> > > > > easily call the constructor of your DoFn passing different
> > > > > PCollectionView's for it to capture.
> > > > >
> > > >
> > > > I disagree with this bit about it being "just as easy". Passing the
> > > needed
> > > > PCollectionViews to your constructor (or even having a constructor)
> is
> > a
> > > > pain. Every time I have to do it, it adds a ton of boilerplate that
> > feels
> > > > like pure noise. To make a DoFn reusable it must be made into a named
> > > class
> > > > with a constructor, versus inlined with no constructor.
> > >
> > > Hm, why? You can have the DoFn be an anonymous class capturing the
> > > PCollectionView into a @SideInput field as a closure.
> > >
> > >
> > > > A generous analogy
> > > > is is that it is "just" manual closure conversion/currying, changing
> > > > f(side, main) to f(side)(main). But in practice in Beam the second
> one
> > > has
> > > > much more boilerplate.
> > > >
> > > > Also, Beam is worse. We present the user with higher-order functions,
> > > which
> > > > is where the actual annoyance comes in. When you want to pardo(f) you
> > > have
> > > > to write pardo(f(side))(side, main). Your proposal is to support
> > > > pardo(f(side))(main) and mine is to support 

Report to the Board, September 2017 edition

2017-09-13 Thread Davor Bonaci
We are expected to submit a project report to the ASF Board of Directors
ahead of its next meeting. The report is due on Wednesday, 9/13.

If interested, please take a look at the draft [1], and comment or
contribute content, as appropriate. I'll submit the report sometime in the
next 24 hours.

Thanks!

Davor

[1]
https://docs.google.com/document/d/1uX8k99k2OXD6tizsJQ9KZtfxbO_I-8rC_oOJgkcJ_O8/


Re: BigQueryIO withSchemaFromView

2017-09-13 Thread Chaim Turkel
just went over the changes for the streaming method.
That looks great.
How about adding the option to continue the apply after success with
statistics or something like in the failure

On Wed, Sep 13, 2017 at 7:19 PM, Reuven Lax  wrote:
> Ah, so you are loading each window into a separate BigQuery table? That
> might be the reason things are slow. Remembert a batch job doesn't return
> until everything finishes, and if you are loading that many tables it's
> entirely possible that BigQuery will throttle you, causing the slowdown.
>
> A couple of options:
>
> 1. Instead of loading into separate BigQuery tables, you could load into
> separate partitions of the same table. See this page for more info:
> https://cloud.google.com/bigquery/docs/partitioned-tables
>
> 2. If you have a streaming unbounded source for your data, you can run
> using a streaming runner. That will load each window as it becomes
> available instead of waiting for everything to load.
>
> Reuven
>
> On Tue, Sep 12, 2017 at 11:48 PM, Chaim Turkel  wrote:
>
>> from what i found it I have the windowing with bigquery partition (per
>> day - 1545 partitions) the insert can take 5 hours, where if there is
>> no partitions then it takes about 12 minutes
>>
>> I have 13,843,080 recrods 6.76 GB.
>> Any ideas how to get the partition to work faster.
>>
>> Is there a way to get the BigQueryIO to use streaming and not jobs?
>>
>> chaim
>>
>> On Tue, Sep 12, 2017 at 11:32 PM, Chaim Turkel  wrote:
>> > i am using windowing for the partion of the table, maybe that has to do
>> with it?
>> >
>> > On Tue, Sep 12, 2017 at 11:25 PM, Reuven Lax 
>> wrote:
>> >> Ok, something is going wrong then. It appears that your job created over
>> >> 14,000 BigQuery load jobs, which is not expected (and probably why
>> things
>> >> were so slow).
>> >>
>> >> On Tue, Sep 12, 2017 at 8:50 AM, Chaim Turkel  wrote:
>> >>
>> >>> no that specific job created only 2 tables
>> >>>
>> >>> On Tue, Sep 12, 2017 at 4:36 PM, Reuven Lax 
>> >>> wrote:
>> >>> > It looks like your job is creating about 14,45 distinct BigQuery
>> tables.
>> >>> > Does that sound correct to you?
>> >>> >
>> >>> > Reuven
>> >>> >
>> >>> > On Tue, Sep 12, 2017 at 6:22 AM, Chaim Turkel 
>> wrote:
>> >>> >
>> >>> >> the job id is 2017-09-12_02_57_55-5233544151932101752
>> >>> >> as you can see the majority of the time is inserting into bigquery.
>> >>> >> is there any way to parallel this?
>> >>> >>
>> >>> >> My feeling for the windowing is that writing should be done per
>> window
>> >>> >> (my window is daily) or at least to be able to configure it
>> >>> >>
>> >>> >> chaim
>> >>> >>
>> >>> >> On Tue, Sep 12, 2017 at 4:10 PM, Reuven Lax
>> 
>> >>> >> wrote:
>> >>> >> > So the problem is you are running on Dataflow, and it's taking
>> longer
>> >>> >> than
>> >>> >> > you think it should? If you provide the Dataflow job id we can
>> help
>> >>> you
>> >>> >> > debug why it's taking 30 minutes. (and as an aside, if this turns
>> >>> into a
>> >>> >> > Dataflow debugging session we should move it off of the Beam list
>> and
>> >>> >> onto
>> >>> >> > a Dataflow-specific tread)
>> >>> >> >
>> >>> >> > Reuven
>> >>> >> >
>> >>> >> > On Tue, Sep 12, 2017 at 3:28 AM, Chaim Turkel 
>> >>> wrote:
>> >>> >> >
>> >>> >> >> is there a way around this, my time for 13gb is not close to 30
>> >>> >> >> minutes, while it should be around 15 minutes.
>> >>> >> >> Do i need to chunk the code myself to windows, and run in
>> parallel?
>> >>> >> >> chaim
>> >>> >> >>
>> >>> >> >> On Sun, Sep 10, 2017 at 6:32 PM, Reuven Lax
>> > >>> >
>> >>> >> >> wrote:
>> >>> >> >> > In that case I can say unequivocally that Dataflow (in batch
>> mode)
>> >>> >> does
>> >>> >> >> not
>> >>> >> >> > produce results for a stage until it has processed that entire
>> >>> stage.
>> >>> >> The
>> >>> >> >> > reason for this is that the batch runner is optimized for
>> >>> throughput,
>> >>> >> not
>> >>> >> >> > latency; it wants to minimize the time for the entire job to
>> >>> finish,
>> >>> >> not
>> >>> >> >> > the time till first output. The side input will not be
>> materialized
>> >>> >> until
>> >>> >> >> > all of the data for all of the windows of the side input have
>> been
>> >>> >> >> > processed. The streaming runner on the other hand will produce
>> >>> >> windows as
>> >>> >> >> > they finish. So for the batch runner, there is no performance
>> >>> >> advantage
>> >>> >> >> you
>> >>> >> >> > get for windowing the side input.
>> >>> >> >> >
>> >>> >> >> > The fact that BigQueryIO needs the schema side input to be
>> globally
>> >>> >> >> > windowed is a bit confusing and not well documented. We should
>> add
>> >>> >> better
>> >>> >> >> > javadoc explaining this.
>> >>> >> >> >
>> >>> >> >> > Reuven
>> >>> >> >> >

Re: Using side inputs in any user code via thread-local side input accessor

2017-09-13 Thread Lukasz Cwik
Initially I was skeptical of the change but after seeing how many "context"
objects were being created to solve issues of passing around references
really showed that our API approach was problematic. Using this pattern
allows us to get rid of things like CombineWithContext and the *new*
SideInputAccessor within FileIO. We see the same thing with PipelineOptions
being passed around in "context" objects and will have the same thing occur
with user state.

On Wed, Sep 13, 2017 at 10:05 AM, Eugene Kirpichov <
kirpic...@google.com.invalid> wrote:

> Hi,
>
> I agree with these concerns to an extent, however I think the advantage of
> transparently letting any user code access side inputs, especially
> including lambdas, is so great that we should find a way to address these
> concerns within the constraints of the pattern I'm proposing. See more
> below.
>
> On Wed, Sep 13, 2017 at 9:29 AM Ben Chambers  >
> wrote:
>
> > One possible issue with this is that updating a thread local is likely to
> > be much more expensive than passing an additional argument.
>
> This is an implementation detail that can be fixed - Luke made a suggestion
> on the PR to set up the side input context once per bundle rather than once
> per element.
>
>
> > Also, not all
> > code called from within the DoFn will necessarily be in the same thread
> > (eg., sometimes we create a pool of threads for doing work).
>
> I think we already require that c.output() can not be done from multiple
> threads; and I don't think we document c.sideInput() to be thread-safe - it
> may be reasonable to declare that it isn't and has to be accessed from the
> same thread as the ProcessElement call. If we want to relax this, then
> there might be ways to deal with that too, e.g. provide utilities for the
> user to capture the "user code context" and restoring it inside a thread.
> This would likely be valuable for other purposes, such as making those
> extra threads visible to our profiling utilities.
>
>
> > It may be
> > *more* confusing for this to sometimes work magically and sometimes fail
> > horribly. Also, requiring the PCollectionView to be passed to user code
> > that accesses it is nice because it makes *very clear* that the side
> input
> > needs to be provided from the DoFn to that particular utility. If it is
> > accessed via "spooky action at a distance" we lose that piece of "free"
> > documentation, which may lead to extensive misuse of these utility
> methods.
> >
> I'd like to understand this concern better - from this description it's not
> clear to me. The pattern I'm proposing is that, when you're authoring a
> PTransform that is configured by any user callbacks, then:
> - you should provide a builder method .withSideInputs(...)
> - you should propagate those side inputs to all your internal DoFn's that
> invoke the user code
> - in return the user callbacks will be allowed to access those particular
> side inputs
> This seems like a simple enough model to me to understand, both from a
> user's perspective and from a transform author's perspective. Steps 1 and 2
> may eventually be automated by annotation analysis or other means (e.g. SDK
> giving a way to provide given side inputs automatically to everything
> inside a composite transform rather than to individual DoFn's).
>
>
> >
> > On Wed, Sep 6, 2017 at 11:10 AM Eugene Kirpichov
> >  wrote:
> >
> > > Hi,
> > >
> > > On Wed, Sep 6, 2017 at 10:55 AM Kenneth Knowles  >
> > > wrote:
> > >
> > > > On Wed, Sep 6, 2017 at 8:15 AM, Eugene Kirpichov <
> > > > kirpic...@google.com.invalid> wrote:
> > > >
> > > > >
> > > > > The differences are:
> > > > > - The proposal in the doc allows wiring different side inputs to
> the
> > > same
> > > > > Supplier, but I'm not convinced that this is important - you can
> just
> > > as
> > > > > easily call the constructor of your DoFn passing different
> > > > > PCollectionView's for it to capture.
> > > > >
> > > >
> > > > I disagree with this bit about it being "just as easy". Passing the
> > > needed
> > > > PCollectionViews to your constructor (or even having a constructor)
> is
> > a
> > > > pain. Every time I have to do it, it adds a ton of boilerplate that
> > feels
> > > > like pure noise. To make a DoFn reusable it must be made into a named
> > > class
> > > > with a constructor, versus inlined with no constructor.
> > >
> > > Hm, why? You can have the DoFn be an anonymous class capturing the
> > > PCollectionView into a @SideInput field as a closure.
> > >
> > >
> > > > A generous analogy
> > > > is is that it is "just" manual closure conversion/currying, changing
> > > > f(side, main) to f(side)(main). But in practice in Beam the second
> one
> > > has
> > > > much more boilerplate.
> > > >
> > > > Also, Beam is worse. We present the user with higher-order functions,
> > > which
> > > > is where the actual annoyance comes in. When you want to pardo(f) 

Re: BigQueryIO withSchemaFromView

2017-09-13 Thread Chaim Turkel
 I am loading into separate partitions of the same table.
I want to see it streaming will be faster.

Is there a repository where i can use the snapshot version?


On Wed, Sep 13, 2017 at 7:19 PM, Reuven Lax  wrote:
> Ah, so you are loading each window into a separate BigQuery table? That
> might be the reason things are slow. Remembert a batch job doesn't return
> until everything finishes, and if you are loading that many tables it's
> entirely possible that BigQuery will throttle you, causing the slowdown.
>
> A couple of options:
>
> 1. Instead of loading into separate BigQuery tables, you could load into
> separate partitions of the same table. See this page for more info:
> https://cloud.google.com/bigquery/docs/partitioned-tables
>
> 2. If you have a streaming unbounded source for your data, you can run
> using a streaming runner. That will load each window as it becomes
> available instead of waiting for everything to load.
>
> Reuven
>
> On Tue, Sep 12, 2017 at 11:48 PM, Chaim Turkel  wrote:
>
>> from what i found it I have the windowing with bigquery partition (per
>> day - 1545 partitions) the insert can take 5 hours, where if there is
>> no partitions then it takes about 12 minutes
>>
>> I have 13,843,080 recrods 6.76 GB.
>> Any ideas how to get the partition to work faster.
>>
>> Is there a way to get the BigQueryIO to use streaming and not jobs?
>>
>> chaim
>>
>> On Tue, Sep 12, 2017 at 11:32 PM, Chaim Turkel  wrote:
>> > i am using windowing for the partion of the table, maybe that has to do
>> with it?
>> >
>> > On Tue, Sep 12, 2017 at 11:25 PM, Reuven Lax 
>> wrote:
>> >> Ok, something is going wrong then. It appears that your job created over
>> >> 14,000 BigQuery load jobs, which is not expected (and probably why
>> things
>> >> were so slow).
>> >>
>> >> On Tue, Sep 12, 2017 at 8:50 AM, Chaim Turkel  wrote:
>> >>
>> >>> no that specific job created only 2 tables
>> >>>
>> >>> On Tue, Sep 12, 2017 at 4:36 PM, Reuven Lax 
>> >>> wrote:
>> >>> > It looks like your job is creating about 14,45 distinct BigQuery
>> tables.
>> >>> > Does that sound correct to you?
>> >>> >
>> >>> > Reuven
>> >>> >
>> >>> > On Tue, Sep 12, 2017 at 6:22 AM, Chaim Turkel 
>> wrote:
>> >>> >
>> >>> >> the job id is 2017-09-12_02_57_55-5233544151932101752
>> >>> >> as you can see the majority of the time is inserting into bigquery.
>> >>> >> is there any way to parallel this?
>> >>> >>
>> >>> >> My feeling for the windowing is that writing should be done per
>> window
>> >>> >> (my window is daily) or at least to be able to configure it
>> >>> >>
>> >>> >> chaim
>> >>> >>
>> >>> >> On Tue, Sep 12, 2017 at 4:10 PM, Reuven Lax
>> 
>> >>> >> wrote:
>> >>> >> > So the problem is you are running on Dataflow, and it's taking
>> longer
>> >>> >> than
>> >>> >> > you think it should? If you provide the Dataflow job id we can
>> help
>> >>> you
>> >>> >> > debug why it's taking 30 minutes. (and as an aside, if this turns
>> >>> into a
>> >>> >> > Dataflow debugging session we should move it off of the Beam list
>> and
>> >>> >> onto
>> >>> >> > a Dataflow-specific tread)
>> >>> >> >
>> >>> >> > Reuven
>> >>> >> >
>> >>> >> > On Tue, Sep 12, 2017 at 3:28 AM, Chaim Turkel 
>> >>> wrote:
>> >>> >> >
>> >>> >> >> is there a way around this, my time for 13gb is not close to 30
>> >>> >> >> minutes, while it should be around 15 minutes.
>> >>> >> >> Do i need to chunk the code myself to windows, and run in
>> parallel?
>> >>> >> >> chaim
>> >>> >> >>
>> >>> >> >> On Sun, Sep 10, 2017 at 6:32 PM, Reuven Lax
>> > >>> >
>> >>> >> >> wrote:
>> >>> >> >> > In that case I can say unequivocally that Dataflow (in batch
>> mode)
>> >>> >> does
>> >>> >> >> not
>> >>> >> >> > produce results for a stage until it has processed that entire
>> >>> stage.
>> >>> >> The
>> >>> >> >> > reason for this is that the batch runner is optimized for
>> >>> throughput,
>> >>> >> not
>> >>> >> >> > latency; it wants to minimize the time for the entire job to
>> >>> finish,
>> >>> >> not
>> >>> >> >> > the time till first output. The side input will not be
>> materialized
>> >>> >> until
>> >>> >> >> > all of the data for all of the windows of the side input have
>> been
>> >>> >> >> > processed. The streaming runner on the other hand will produce
>> >>> >> windows as
>> >>> >> >> > they finish. So for the batch runner, there is no performance
>> >>> >> advantage
>> >>> >> >> you
>> >>> >> >> > get for windowing the side input.
>> >>> >> >> >
>> >>> >> >> > The fact that BigQueryIO needs the schema side input to be
>> globally
>> >>> >> >> > windowed is a bit confusing and not well documented. We should
>> add
>> >>> >> better
>> >>> >> >> > javadoc explaining this.
>> >>> >> >> >
>> >>> >> >> > Reuven
>> >>> >> >> >
>> >>> >> >> > On 

Re: Using side inputs in any user code via thread-local side input accessor

2017-09-13 Thread Eugene Kirpichov
Hi,

I agree with these concerns to an extent, however I think the advantage of
transparently letting any user code access side inputs, especially
including lambdas, is so great that we should find a way to address these
concerns within the constraints of the pattern I'm proposing. See more
below.

On Wed, Sep 13, 2017 at 9:29 AM Ben Chambers 
wrote:

> One possible issue with this is that updating a thread local is likely to
> be much more expensive than passing an additional argument.

This is an implementation detail that can be fixed - Luke made a suggestion
on the PR to set up the side input context once per bundle rather than once
per element.


> Also, not all
> code called from within the DoFn will necessarily be in the same thread
> (eg., sometimes we create a pool of threads for doing work).

I think we already require that c.output() can not be done from multiple
threads; and I don't think we document c.sideInput() to be thread-safe - it
may be reasonable to declare that it isn't and has to be accessed from the
same thread as the ProcessElement call. If we want to relax this, then
there might be ways to deal with that too, e.g. provide utilities for the
user to capture the "user code context" and restoring it inside a thread.
This would likely be valuable for other purposes, such as making those
extra threads visible to our profiling utilities.


> It may be
> *more* confusing for this to sometimes work magically and sometimes fail
> horribly. Also, requiring the PCollectionView to be passed to user code
> that accesses it is nice because it makes *very clear* that the side input
> needs to be provided from the DoFn to that particular utility. If it is
> accessed via "spooky action at a distance" we lose that piece of "free"
> documentation, which may lead to extensive misuse of these utility methods.
>
I'd like to understand this concern better - from this description it's not
clear to me. The pattern I'm proposing is that, when you're authoring a
PTransform that is configured by any user callbacks, then:
- you should provide a builder method .withSideInputs(...)
- you should propagate those side inputs to all your internal DoFn's that
invoke the user code
- in return the user callbacks will be allowed to access those particular
side inputs
This seems like a simple enough model to me to understand, both from a
user's perspective and from a transform author's perspective. Steps 1 and 2
may eventually be automated by annotation analysis or other means (e.g. SDK
giving a way to provide given side inputs automatically to everything
inside a composite transform rather than to individual DoFn's).


>
> On Wed, Sep 6, 2017 at 11:10 AM Eugene Kirpichov
>  wrote:
>
> > Hi,
> >
> > On Wed, Sep 6, 2017 at 10:55 AM Kenneth Knowles 
> > wrote:
> >
> > > On Wed, Sep 6, 2017 at 8:15 AM, Eugene Kirpichov <
> > > kirpic...@google.com.invalid> wrote:
> > >
> > > >
> > > > The differences are:
> > > > - The proposal in the doc allows wiring different side inputs to the
> > same
> > > > Supplier, but I'm not convinced that this is important - you can just
> > as
> > > > easily call the constructor of your DoFn passing different
> > > > PCollectionView's for it to capture.
> > > >
> > >
> > > I disagree with this bit about it being "just as easy". Passing the
> > needed
> > > PCollectionViews to your constructor (or even having a constructor) is
> a
> > > pain. Every time I have to do it, it adds a ton of boilerplate that
> feels
> > > like pure noise. To make a DoFn reusable it must be made into a named
> > class
> > > with a constructor, versus inlined with no constructor.
> >
> > Hm, why? You can have the DoFn be an anonymous class capturing the
> > PCollectionView into a @SideInput field as a closure.
> >
> >
> > > A generous analogy
> > > is is that it is "just" manual closure conversion/currying, changing
> > > f(side, main) to f(side)(main). But in practice in Beam the second one
> > has
> > > much more boilerplate.
> > >
> > > Also, Beam is worse. We present the user with higher-order functions,
> > which
> > > is where the actual annoyance comes in. When you want to pardo(f) you
> > have
> > > to write pardo(f(side))(side, main). Your proposal is to support
> > > pardo(f(side))(main) and mine is to support pardo(f)(side, main). I
> still
> > > propose that we support both (as they get implemented). If you buy in
> to
> > my
> > > analogy, then there's decades of precedent and the burden of proof
> falls
> > > heavily on whoever doesn't want to support both.
> > >
> > I see your point. I think the proposal is compatible with what you're
> > suggesting too - in DoFn we could have @SideInput *parameters* of type
> > PCollectionView, with the same semantics as a field.
> >
> >
> > >
> > > - My proposal allows getting rid of .withSideInputs() entirely, because
> > the
> > > > DoFn captures the PCollectionView so you don't need 

Re: Using side inputs in any user code via thread-local side input accessor

2017-09-13 Thread Ben Chambers
One possible issue with this is that updating a thread local is likely to
be much more expensive than passing an additional argument. Also, not all
code called from within the DoFn will necessarily be in the same thread
(eg., sometimes we create a pool of threads for doing work). It may be
*more* confusing for this to sometimes work magically and sometimes fail
horribly. Also, requiring the PCollectionView to be passed to user code
that accesses it is nice because it makes *very clear* that the side input
needs to be provided from the DoFn to that particular utility. If it is
accessed via "spooky action at a distance" we lose that piece of "free"
documentation, which may lead to extensive misuse of these utility methods.

On Wed, Sep 6, 2017 at 11:10 AM Eugene Kirpichov
 wrote:

> Hi,
>
> On Wed, Sep 6, 2017 at 10:55 AM Kenneth Knowles 
> wrote:
>
> > On Wed, Sep 6, 2017 at 8:15 AM, Eugene Kirpichov <
> > kirpic...@google.com.invalid> wrote:
> >
> > >
> > > The differences are:
> > > - The proposal in the doc allows wiring different side inputs to the
> same
> > > Supplier, but I'm not convinced that this is important - you can just
> as
> > > easily call the constructor of your DoFn passing different
> > > PCollectionView's for it to capture.
> > >
> >
> > I disagree with this bit about it being "just as easy". Passing the
> needed
> > PCollectionViews to your constructor (or even having a constructor) is a
> > pain. Every time I have to do it, it adds a ton of boilerplate that feels
> > like pure noise. To make a DoFn reusable it must be made into a named
> class
> > with a constructor, versus inlined with no constructor.
>
> Hm, why? You can have the DoFn be an anonymous class capturing the
> PCollectionView into a @SideInput field as a closure.
>
>
> > A generous analogy
> > is is that it is "just" manual closure conversion/currying, changing
> > f(side, main) to f(side)(main). But in practice in Beam the second one
> has
> > much more boilerplate.
> >
> > Also, Beam is worse. We present the user with higher-order functions,
> which
> > is where the actual annoyance comes in. When you want to pardo(f) you
> have
> > to write pardo(f(side))(side, main). Your proposal is to support
> > pardo(f(side))(main) and mine is to support pardo(f)(side, main). I still
> > propose that we support both (as they get implemented). If you buy in to
> my
> > analogy, then there's decades of precedent and the burden of proof falls
> > heavily on whoever doesn't want to support both.
> >
> I see your point. I think the proposal is compatible with what you're
> suggesting too - in DoFn we could have @SideInput *parameters* of type
> PCollectionView, with the same semantics as a field.
>
>
> >
> > - My proposal allows getting rid of .withSideInputs() entirely, because
> the
> > > DoFn captures the PCollectionView so you don't need to specify it
> > > explicitly for wiring.
> > >
> >
> > I've decided to change to full +1 (whatever that means compared to 0.75
> :-)
> > to adding support for @SideInput fields, because the benefits outweigh
> this
> > failure mode:
> >
> > new DoFn {
> >   // forgot the annotation
> >   private final PCollectionView whatever;
> >
> >   @ProcessElement public void process(...) {
> > whatever.get(); // crash during execution
> >   }
> > }
> >
> > But ideas to mitigate that would be cool.
>
> Hm, can't think of anything less hacky than "prohibit having fields of type
> PCollectionView that are not public, final, and annotated with @SideInput"
> - not sure we'd want to go down this road. I suppose a good error message
> in .get() would be sufficient, saying "Did you forget to specify a
> requirement for this side input via .withSideInputs() or by annotating the
> field as @SideInput" or something like that.
>
> >
>
>
> > Kenn
> >
> >
> > >
> > > On Wed, Sep 6, 2017 at 6:03 AM Lukasz Cwik 
> > > wrote:
> > >
> > > > My concern with the proposal is not the specifics of how it will work
> > and
> > > > more about it being yet another way on how our API is to be used even
> > > > though we have a proposal [1] of an API style we were working towards
> > in
> > > > Java and Python. I would rather re-open that discussion now about
> what
> > we
> > > > want that API to look like for our major features and work towards
> > > > consistency (or not if there is a strong argument as to why some
> > feature
> > > > should have a different style).
> > > >
> > > > 1: https://s.apache.org/a-new-dofn
> > > >
> > > > On Wed, Sep 6, 2017 at 12:22 AM, Kenneth Knowles
> >  > > >
> > > > wrote:
> > > >
> > > > > +0.75 because I'd like to bring up invalid pipelines.
> > > > >
> > > > > I had proposed side inputs as parameters to DoFn in
> > > > > https://s.apache.org/a-new-dofn (specifically at [1]) so the only
> > > place
> > > > > they are specified is in the graph construction, making the DoFn
> more
> > > > 

Re: BigQueryIO withSchemaFromView

2017-09-13 Thread Reuven Lax
Ah, so you are loading each window into a separate BigQuery table? That
might be the reason things are slow. Remembert a batch job doesn't return
until everything finishes, and if you are loading that many tables it's
entirely possible that BigQuery will throttle you, causing the slowdown.

A couple of options:

1. Instead of loading into separate BigQuery tables, you could load into
separate partitions of the same table. See this page for more info:
https://cloud.google.com/bigquery/docs/partitioned-tables

2. If you have a streaming unbounded source for your data, you can run
using a streaming runner. That will load each window as it becomes
available instead of waiting for everything to load.

Reuven

On Tue, Sep 12, 2017 at 11:48 PM, Chaim Turkel  wrote:

> from what i found it I have the windowing with bigquery partition (per
> day - 1545 partitions) the insert can take 5 hours, where if there is
> no partitions then it takes about 12 minutes
>
> I have 13,843,080 recrods 6.76 GB.
> Any ideas how to get the partition to work faster.
>
> Is there a way to get the BigQueryIO to use streaming and not jobs?
>
> chaim
>
> On Tue, Sep 12, 2017 at 11:32 PM, Chaim Turkel  wrote:
> > i am using windowing for the partion of the table, maybe that has to do
> with it?
> >
> > On Tue, Sep 12, 2017 at 11:25 PM, Reuven Lax 
> wrote:
> >> Ok, something is going wrong then. It appears that your job created over
> >> 14,000 BigQuery load jobs, which is not expected (and probably why
> things
> >> were so slow).
> >>
> >> On Tue, Sep 12, 2017 at 8:50 AM, Chaim Turkel  wrote:
> >>
> >>> no that specific job created only 2 tables
> >>>
> >>> On Tue, Sep 12, 2017 at 4:36 PM, Reuven Lax 
> >>> wrote:
> >>> > It looks like your job is creating about 14,45 distinct BigQuery
> tables.
> >>> > Does that sound correct to you?
> >>> >
> >>> > Reuven
> >>> >
> >>> > On Tue, Sep 12, 2017 at 6:22 AM, Chaim Turkel 
> wrote:
> >>> >
> >>> >> the job id is 2017-09-12_02_57_55-5233544151932101752
> >>> >> as you can see the majority of the time is inserting into bigquery.
> >>> >> is there any way to parallel this?
> >>> >>
> >>> >> My feeling for the windowing is that writing should be done per
> window
> >>> >> (my window is daily) or at least to be able to configure it
> >>> >>
> >>> >> chaim
> >>> >>
> >>> >> On Tue, Sep 12, 2017 at 4:10 PM, Reuven Lax
> 
> >>> >> wrote:
> >>> >> > So the problem is you are running on Dataflow, and it's taking
> longer
> >>> >> than
> >>> >> > you think it should? If you provide the Dataflow job id we can
> help
> >>> you
> >>> >> > debug why it's taking 30 minutes. (and as an aside, if this turns
> >>> into a
> >>> >> > Dataflow debugging session we should move it off of the Beam list
> and
> >>> >> onto
> >>> >> > a Dataflow-specific tread)
> >>> >> >
> >>> >> > Reuven
> >>> >> >
> >>> >> > On Tue, Sep 12, 2017 at 3:28 AM, Chaim Turkel 
> >>> wrote:
> >>> >> >
> >>> >> >> is there a way around this, my time for 13gb is not close to 30
> >>> >> >> minutes, while it should be around 15 minutes.
> >>> >> >> Do i need to chunk the code myself to windows, and run in
> parallel?
> >>> >> >> chaim
> >>> >> >>
> >>> >> >> On Sun, Sep 10, 2017 at 6:32 PM, Reuven Lax
>  >>> >
> >>> >> >> wrote:
> >>> >> >> > In that case I can say unequivocally that Dataflow (in batch
> mode)
> >>> >> does
> >>> >> >> not
> >>> >> >> > produce results for a stage until it has processed that entire
> >>> stage.
> >>> >> The
> >>> >> >> > reason for this is that the batch runner is optimized for
> >>> throughput,
> >>> >> not
> >>> >> >> > latency; it wants to minimize the time for the entire job to
> >>> finish,
> >>> >> not
> >>> >> >> > the time till first output. The side input will not be
> materialized
> >>> >> until
> >>> >> >> > all of the data for all of the windows of the side input have
> been
> >>> >> >> > processed. The streaming runner on the other hand will produce
> >>> >> windows as
> >>> >> >> > they finish. So for the batch runner, there is no performance
> >>> >> advantage
> >>> >> >> you
> >>> >> >> > get for windowing the side input.
> >>> >> >> >
> >>> >> >> > The fact that BigQueryIO needs the schema side input to be
> globally
> >>> >> >> > windowed is a bit confusing and not well documented. We should
> add
> >>> >> better
> >>> >> >> > javadoc explaining this.
> >>> >> >> >
> >>> >> >> > Reuven
> >>> >> >> >
> >>> >> >> > On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel <
> ch...@behalf.com>
> >>> >> wrote:
> >>> >> >> >
> >>> >> >> >> batch on dataflow
> >>> >> >> >>
> >>> >> >> >> On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax
> >>>  >>> >> >
> >>> >> >> >> wrote:
> >>> >> >> >> > Which runner are you using? And is this a batch pipeline?
> >>> >> >> >> >
> >>> >> >> >> > On Sat, Sep 

Re: BigQueryIO withSchemaFromView

2017-09-13 Thread Lukasz Cwik
Support was added to expose how users want to load their data with
https://github.com/apache/beam/commit/075d4d45a9cd398f3b4023b6efd495cc58eb9bdd
It is planned to be released in 2.2.0

On Tue, Sep 12, 2017 at 11:48 PM, Chaim Turkel  wrote:

> from what i found it I have the windowing with bigquery partition (per
> day - 1545 partitions) the insert can take 5 hours, where if there is
> no partitions then it takes about 12 minutes
>
> I have 13,843,080 recrods 6.76 GB.
> Any ideas how to get the partition to work faster.
>
> Is there a way to get the BigQueryIO to use streaming and not jobs?
>
> chaim
>
> On Tue, Sep 12, 2017 at 11:32 PM, Chaim Turkel  wrote:
> > i am using windowing for the partion of the table, maybe that has to do
> with it?
> >
> > On Tue, Sep 12, 2017 at 11:25 PM, Reuven Lax 
> wrote:
> >> Ok, something is going wrong then. It appears that your job created over
> >> 14,000 BigQuery load jobs, which is not expected (and probably why
> things
> >> were so slow).
> >>
> >> On Tue, Sep 12, 2017 at 8:50 AM, Chaim Turkel  wrote:
> >>
> >>> no that specific job created only 2 tables
> >>>
> >>> On Tue, Sep 12, 2017 at 4:36 PM, Reuven Lax 
> >>> wrote:
> >>> > It looks like your job is creating about 14,45 distinct BigQuery
> tables.
> >>> > Does that sound correct to you?
> >>> >
> >>> > Reuven
> >>> >
> >>> > On Tue, Sep 12, 2017 at 6:22 AM, Chaim Turkel 
> wrote:
> >>> >
> >>> >> the job id is 2017-09-12_02_57_55-5233544151932101752
> >>> >> as you can see the majority of the time is inserting into bigquery.
> >>> >> is there any way to parallel this?
> >>> >>
> >>> >> My feeling for the windowing is that writing should be done per
> window
> >>> >> (my window is daily) or at least to be able to configure it
> >>> >>
> >>> >> chaim
> >>> >>
> >>> >> On Tue, Sep 12, 2017 at 4:10 PM, Reuven Lax
> 
> >>> >> wrote:
> >>> >> > So the problem is you are running on Dataflow, and it's taking
> longer
> >>> >> than
> >>> >> > you think it should? If you provide the Dataflow job id we can
> help
> >>> you
> >>> >> > debug why it's taking 30 minutes. (and as an aside, if this turns
> >>> into a
> >>> >> > Dataflow debugging session we should move it off of the Beam list
> and
> >>> >> onto
> >>> >> > a Dataflow-specific tread)
> >>> >> >
> >>> >> > Reuven
> >>> >> >
> >>> >> > On Tue, Sep 12, 2017 at 3:28 AM, Chaim Turkel 
> >>> wrote:
> >>> >> >
> >>> >> >> is there a way around this, my time for 13gb is not close to 30
> >>> >> >> minutes, while it should be around 15 minutes.
> >>> >> >> Do i need to chunk the code myself to windows, and run in
> parallel?
> >>> >> >> chaim
> >>> >> >>
> >>> >> >> On Sun, Sep 10, 2017 at 6:32 PM, Reuven Lax
>  >>> >
> >>> >> >> wrote:
> >>> >> >> > In that case I can say unequivocally that Dataflow (in batch
> mode)
> >>> >> does
> >>> >> >> not
> >>> >> >> > produce results for a stage until it has processed that entire
> >>> stage.
> >>> >> The
> >>> >> >> > reason for this is that the batch runner is optimized for
> >>> throughput,
> >>> >> not
> >>> >> >> > latency; it wants to minimize the time for the entire job to
> >>> finish,
> >>> >> not
> >>> >> >> > the time till first output. The side input will not be
> materialized
> >>> >> until
> >>> >> >> > all of the data for all of the windows of the side input have
> been
> >>> >> >> > processed. The streaming runner on the other hand will produce
> >>> >> windows as
> >>> >> >> > they finish. So for the batch runner, there is no performance
> >>> >> advantage
> >>> >> >> you
> >>> >> >> > get for windowing the side input.
> >>> >> >> >
> >>> >> >> > The fact that BigQueryIO needs the schema side input to be
> globally
> >>> >> >> > windowed is a bit confusing and not well documented. We should
> add
> >>> >> better
> >>> >> >> > javadoc explaining this.
> >>> >> >> >
> >>> >> >> > Reuven
> >>> >> >> >
> >>> >> >> > On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel <
> ch...@behalf.com>
> >>> >> wrote:
> >>> >> >> >
> >>> >> >> >> batch on dataflow
> >>> >> >> >>
> >>> >> >> >> On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax
> >>>  >>> >> >
> >>> >> >> >> wrote:
> >>> >> >> >> > Which runner are you using? And is this a batch pipeline?
> >>> >> >> >> >
> >>> >> >> >> > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <
> ch...@behalf.com
> >>> >
> >>> >> >> wrote:
> >>> >> >> >> >
> >>> >> >> >> >> Thank for the answer, but i don't think that that is the
> case.
> >>> >> From
> >>> >> >> >> >> what i have seen, since i have other code to update status
> >>> based
> >>> >> on
> >>> >> >> >> >> the window, it does get called before all the windows are
> >>> >> calculated.
> >>> >> >> >> >> There is no logical reason to wait, once the window has
> >>> finished,
> >>> >> the
> >>> >> >> >> >> rest of the 

Re: Migration From 1.9.x to 2.1.0

2017-09-13 Thread Eugene Kirpichov
The full set of changes is described in
https://cloud.google.com/dataflow/release-notes/release-notes-java-2

On Wed, Sep 13, 2017 at 8:53 AM Thomas Groh 
wrote:

> for (1) and (4), the DoFn methods have been moved to be reflection based.
> Instead of using `@Override` in your DoFns, you should annotate those
> methods with `@StartBundle`, `@ProcessElement`, and `@FinishBundle`
> instead.
>
> For (2), Aggregators have been removed. Our suggested replacement is the
> use of the `Metrics` class - in this case, a Counter metric is appropriate.
>
> For (3), `sideOutput` has been renamed to `output`; the use is otherwise
> identical.
>
> for (5), the pattern has changed from `TextIO.Read.from(...)` to
> `TextIO.read().from(...)` (which should allow the remainder of the
> PTransform to also be configured without having to specify a Filepattern up
> front)
>
> On Tue, Sep 12, 2017 at 8:39 PM, Arunkumar Santhanagopalan <
> arunk...@gmail.com> wrote:
>
> > Hi,
> >
> > We are trying to migrate from Dataflow 1.9.x to Dataflow 2.1.0
> >
> > I need help with the following changes
> >
> >
> > 1.
> > class Join extends DoFn {
> > @Override
> > public void startBundle(Context c) throws Exception {
> > super.startBundle(c);
> > createParser();
> > }
> >
> > Method "startBundle" does not override method startBundle from its
> > superclass
> >
> >
> > 2.
> > class Join extends DoFn{
> >   private final Aggregator duplicatesCount =
> > createAggregator(DUPLICATES_COUNTER, new Sum.SumLongFn());
> >
> > cannot resolve method 'createAggregator, Sum.SumLongFn has a private
> access
> >
> > 3.
> > class Join extends DoFn{
> >   public void processElement(ProcessContext c) {
> >  c.sideOutput(duplicatesTag, s)
> >   }
> > cannot resolve method sideOutput(org.apache.beam.sdk.values.TupleTag)
> >
> >
> > 4.
> > public abstract class ReadCsv extends DoFn {
> >
> > @Override
> > public final void processElement(ProcessContext c) throws Exception {
> > T output = processElement(c.element(), c);
> > if (output != null) {
> > c.output(output);
> > }
> > }
> > Method does not override method processElement from its superclass
> >
> >
> > 5.
> > import org.apache.beam.sdk.io.TextIO;
> >
> > TextIO.Read.from("gs://spins/data/part-*")
> > Non-static method "from" cannot be referenced from static context
> >
>


Re: Migration From 1.9.x to 2.1.0

2017-09-13 Thread Thomas Groh
for (1) and (4), the DoFn methods have been moved to be reflection based.
Instead of using `@Override` in your DoFns, you should annotate those
methods with `@StartBundle`, `@ProcessElement`, and `@FinishBundle` instead.

For (2), Aggregators have been removed. Our suggested replacement is the
use of the `Metrics` class - in this case, a Counter metric is appropriate.

For (3), `sideOutput` has been renamed to `output`; the use is otherwise
identical.

for (5), the pattern has changed from `TextIO.Read.from(...)` to
`TextIO.read().from(...)` (which should allow the remainder of the
PTransform to also be configured without having to specify a Filepattern up
front)

On Tue, Sep 12, 2017 at 8:39 PM, Arunkumar Santhanagopalan <
arunk...@gmail.com> wrote:

> Hi,
>
> We are trying to migrate from Dataflow 1.9.x to Dataflow 2.1.0
>
> I need help with the following changes
>
>
> 1.
> class Join extends DoFn {
> @Override
> public void startBundle(Context c) throws Exception {
> super.startBundle(c);
> createParser();
> }
>
> Method "startBundle" does not override method startBundle from its
> superclass
>
>
> 2.
> class Join extends DoFn{
>   private final Aggregator duplicatesCount =
> createAggregator(DUPLICATES_COUNTER, new Sum.SumLongFn());
>
> cannot resolve method 'createAggregator, Sum.SumLongFn has a private access
>
> 3.
> class Join extends DoFn{
>   public void processElement(ProcessContext c) {
>  c.sideOutput(duplicatesTag, s)
>   }
> cannot resolve method sideOutput(org.apache.beam.sdk.values.TupleTag)
>
>
> 4.
> public abstract class ReadCsv extends DoFn {
>
> @Override
> public final void processElement(ProcessContext c) throws Exception {
> T output = processElement(c.element(), c);
> if (output != null) {
> c.output(output);
> }
> }
> Method does not override method processElement from its superclass
>
>
> 5.
> import org.apache.beam.sdk.io.TextIO;
>
> TextIO.Read.from("gs://spins/data/part-*")
> Non-static method "from" cannot be referenced from static context
>


Migration From 1.9.x to 2.1.0

2017-09-13 Thread Arunkumar Santhanagopalan
Hi,

We are trying to migrate from Dataflow 1.9.x to Dataflow 2.1.0

I need help with the following changes


1.
class Join extends DoFn {
@Override
public void startBundle(Context c) throws Exception {
super.startBundle(c);
createParser();
}

Method "startBundle" does not override method startBundle from its
superclass


2.
class Join extends DoFn{
  private final Aggregator duplicatesCount =
createAggregator(DUPLICATES_COUNTER, new Sum.SumLongFn());

cannot resolve method 'createAggregator, Sum.SumLongFn has a private access

3.
class Join extends DoFn{
  public void processElement(ProcessContext c) {
 c.sideOutput(duplicatesTag, s)
  }
cannot resolve method sideOutput(org.apache.beam.sdk.values.TupleTag)


4.
public abstract class ReadCsv extends DoFn {

@Override
public final void processElement(ProcessContext c) throws Exception {
T output = processElement(c.element(), c);
if (output != null) {
c.output(output);
}
}
Method does not override method processElement from its superclass


5.
import org.apache.beam.sdk.io.TextIO;

TextIO.Read.from("gs://spins/data/part-*")
Non-static method "from" cannot be referenced from static context


Build failed in Jenkins: beam_Release_NightlySnapshot #533

2017-09-13 Thread Apache Jenkins Server
See 


Changes:

[chamikara] Add first few BigtableWriteException to suppressed list when 
rethrowing

[mingmxu] [BEAM-2804] support TIMESTAMP in sort

[chamikara] Fix GqlQueryTranslateFn to pass localhost option to DatastoreFactory

[klk] Improve error message for bad DoFn URN in ParDoTranslation

[klk] Key DoFnInstanceManager cache on AppliedPTransform

[klk] Add fast path to ParDoTranslation for known ParDo class

[owenzhang1990] [BEAM-2632] Use Junit Paramaterized test suits in TextIOReadTest

--
[...truncated 2.20 MB...]
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
[JENKINS] Archiving disabled
2017-09-13T07:58:26.774 [INFO] 

2017-09-13T07:58:26.774 [INFO] Reactor Summary:
2017-09-13T07:58:26.774 [INFO] 
2017-09-13T07:58:26.774 [INFO] Apache Beam :: Parent 
.. SUCCESS [ 45.738 s]
2017-09-13T07:58:26.774 [INFO] Apache Beam :: SDKs :: Java :: Build Tools 
. SUCCESS [ 36.771 s]
2017-09-13T07:58:26.774 [INFO] Apache Beam :: SDKs 
 SUCCESS [  7.760 s]
2017-09-13T07:58:26.774 [INFO] Apache Beam :: SDKs :: Common 
.. SUCCESS [  4.765 s]
2017-09-13T07:58:26.774 [INFO] Apache Beam :: SDKs :: Common :: Runner API 
 SUCCESS [ 43.691 s]
2017-09-13T07:58:26.774 [INFO] Apache Beam :: SDKs :: Common :: Fn API 
 SUCCESS [ 25.364 s]
2017-09-13T07:58:26.774 [INFO] Apache Beam :: SDKs :: Java 
 SUCCESS [  5.350 s]
2017-09-13T07:58:26.774 [INFO] Apache Beam :: SDKs :: Java :: Core 
 SUCCESS [03:03 min]
2017-09-13T07:58:26.774 [INFO] Apache Beam :: Runners 
. SUCCESS [  4.939 s]
2017-09-13T07:58:26.774 [INFO] Apache Beam :: Runners :: Core Construction Java 
... SUCCESS [ 46.878 s]
2017-09-13T07:58:26.774 [INFO] Apache Beam :: Runners :: Core Java 
 SUCCESS [01:04 min]
2017-09-13T07:58:26.774 [INFO] Apache Beam :: Runners :: Direct Java 
.. SUCCESS [04:48 min]
2017-09-13T07:58:26.774 [INFO] Apache Beam :: SDKs :: Java :: IO 
.. SUCCESS [  4.976 s]
2017-09-13T07:58:26.774 [INFO] Apache Beam :: SDKs :: Java :: IO :: AMQP 
.. SUCCESS [ 48.323 s]
2017-09-13T07:58:26.774 [INFO] Apache Beam :: SDKs :: Java :: IO :: Common 
 SUCCESS [ 14.033 s]
2017-09-13T07:58:26.774 [INFO] Apache Beam :: SDKs :: Java :: IO :: Cassandra 
. SUCCESS [ 38.920 s]
2017-09-13T07:58:26.774 [INFO] Apache Beam :: SDKs :: Java :: IO :: 
Elasticsearch . SUCCESS [ 47.592 s]
2017-09-13T07:58:26.774 [INFO] Apache Beam :: SDKs :: Java :: Extensions 
.. SUCCESS [  4.804 s]
2017-09-13T07:58:26.774 [INFO] Apache Beam :: SDKs :: Java :: Extensions :: 
Google Cloud Platform Core SUCCESS [ 37.803 s]
2017-09-13T07:58:26.774 [INFO] Apache Beam :: SDKs :: Java :: Extensions :: 
Protobuf SUCCESS [ 39.350 s]
2017-09-13T07:58:26.774 [INFO] Apache Beam :: SDKs :: Java :: IO :: Google 
Cloud Platform SUCCESS [01:53 min]
2017-09-13T07:58:26.774 [INFO] Apache Beam :: SDKs :: Java :: IO :: Hadoop 
Common . SUCCESS [ 41.333 s]
2017-09-13T07:58:26.774 [INFO] Apache Beam :: SDKs :: Java :: IO :: Hadoop File 
System SUCCESS [ 

Re: BigQueryIO withSchemaFromView

2017-09-13 Thread Chaim Turkel
from what i found it I have the windowing with bigquery partition (per
day - 1545 partitions) the insert can take 5 hours, where if there is
no partitions then it takes about 12 minutes

I have 13,843,080 recrods 6.76 GB.
Any ideas how to get the partition to work faster.

Is there a way to get the BigQueryIO to use streaming and not jobs?

chaim

On Tue, Sep 12, 2017 at 11:32 PM, Chaim Turkel  wrote:
> i am using windowing for the partion of the table, maybe that has to do with 
> it?
>
> On Tue, Sep 12, 2017 at 11:25 PM, Reuven Lax  wrote:
>> Ok, something is going wrong then. It appears that your job created over
>> 14,000 BigQuery load jobs, which is not expected (and probably why things
>> were so slow).
>>
>> On Tue, Sep 12, 2017 at 8:50 AM, Chaim Turkel  wrote:
>>
>>> no that specific job created only 2 tables
>>>
>>> On Tue, Sep 12, 2017 at 4:36 PM, Reuven Lax 
>>> wrote:
>>> > It looks like your job is creating about 14,45 distinct BigQuery tables.
>>> > Does that sound correct to you?
>>> >
>>> > Reuven
>>> >
>>> > On Tue, Sep 12, 2017 at 6:22 AM, Chaim Turkel  wrote:
>>> >
>>> >> the job id is 2017-09-12_02_57_55-5233544151932101752
>>> >> as you can see the majority of the time is inserting into bigquery.
>>> >> is there any way to parallel this?
>>> >>
>>> >> My feeling for the windowing is that writing should be done per window
>>> >> (my window is daily) or at least to be able to configure it
>>> >>
>>> >> chaim
>>> >>
>>> >> On Tue, Sep 12, 2017 at 4:10 PM, Reuven Lax 
>>> >> wrote:
>>> >> > So the problem is you are running on Dataflow, and it's taking longer
>>> >> than
>>> >> > you think it should? If you provide the Dataflow job id we can help
>>> you
>>> >> > debug why it's taking 30 minutes. (and as an aside, if this turns
>>> into a
>>> >> > Dataflow debugging session we should move it off of the Beam list and
>>> >> onto
>>> >> > a Dataflow-specific tread)
>>> >> >
>>> >> > Reuven
>>> >> >
>>> >> > On Tue, Sep 12, 2017 at 3:28 AM, Chaim Turkel 
>>> wrote:
>>> >> >
>>> >> >> is there a way around this, my time for 13gb is not close to 30
>>> >> >> minutes, while it should be around 15 minutes.
>>> >> >> Do i need to chunk the code myself to windows, and run in parallel?
>>> >> >> chaim
>>> >> >>
>>> >> >> On Sun, Sep 10, 2017 at 6:32 PM, Reuven Lax >> >
>>> >> >> wrote:
>>> >> >> > In that case I can say unequivocally that Dataflow (in batch mode)
>>> >> does
>>> >> >> not
>>> >> >> > produce results for a stage until it has processed that entire
>>> stage.
>>> >> The
>>> >> >> > reason for this is that the batch runner is optimized for
>>> throughput,
>>> >> not
>>> >> >> > latency; it wants to minimize the time for the entire job to
>>> finish,
>>> >> not
>>> >> >> > the time till first output. The side input will not be materialized
>>> >> until
>>> >> >> > all of the data for all of the windows of the side input have been
>>> >> >> > processed. The streaming runner on the other hand will produce
>>> >> windows as
>>> >> >> > they finish. So for the batch runner, there is no performance
>>> >> advantage
>>> >> >> you
>>> >> >> > get for windowing the side input.
>>> >> >> >
>>> >> >> > The fact that BigQueryIO needs the schema side input to be globally
>>> >> >> > windowed is a bit confusing and not well documented. We should add
>>> >> better
>>> >> >> > javadoc explaining this.
>>> >> >> >
>>> >> >> > Reuven
>>> >> >> >
>>> >> >> > On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel 
>>> >> wrote:
>>> >> >> >
>>> >> >> >> batch on dataflow
>>> >> >> >>
>>> >> >> >> On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax
>>> >> >> >
>>> >> >> >> wrote:
>>> >> >> >> > Which runner are you using? And is this a batch pipeline?
>>> >> >> >> >
>>> >> >> >> > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel >> >
>>> >> >> wrote:
>>> >> >> >> >
>>> >> >> >> >> Thank for the answer, but i don't think that that is the case.
>>> >> From
>>> >> >> >> >> what i have seen, since i have other code to update status
>>> based
>>> >> on
>>> >> >> >> >> the window, it does get called before all the windows are
>>> >> calculated.
>>> >> >> >> >> There is no logical reason to wait, once the window has
>>> finished,
>>> >> the
>>> >> >> >> >> rest of the pipeline should run and the BigQuery should start
>>> to
>>> >> >> write
>>> >> >> >> >> the results.
>>> >> >> >> >>
>>> >> >> >> >>
>>> >> >> >> >>
>>> >> >> >> >> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax
>>> >> >> >> >> >
>>> >> >> >> >> wrote:
>>> >> >> >> >> > Logically the BigQuery write does not depend on windows, and
>>> >> >> writing
>>> >> >> >> it
>>> >> >> >> >> > windowed would result in incorrect output. For this reason,
>>> >> >> BigQueryIO
>>> >> >> >> >> > rewindows int global windows