Re: Can we allow SimpleFunction and SerializableFunction to throw Exception?

2018-11-28 Thread Kenneth Knowles
Nice! A clean solution and an opportunity to bikeshed on names. This has
everything I love.

Kenn

On Wed, Nov 28, 2018 at 6:43 PM Jeff Klukas  wrote:

> It looks like we can add make the new interface a superinterface for the
> existing SerializableFunction while maintaining binary compatibility [0].
>
> We'd have:
>
> public interface NewSerializableFunction extends
> Serializable {
>   OutputT apply(InputT input) throws Exception;
> }
>
> and then modify SerializableFunction to inherit from it:
>
> public interface SerializableFunction extends
> NewSerializableFunction, Serializable {
>   @Override
>   OutputT apply(InputT input);
> }
>
>
> IIUC, we can then more or less replace all references to
> SerializableFunction with NewSerializableFunction across the beam codebase
> without having to introduce any new overrides. I'm working on a proof of
> concept now.
>
> It's not clear what the actual names for NewSerializableFunction and
> NewSimpleFunction should be.
>
> [0]
> https://docs.oracle.com/javase/specs/jls/se8/html/jls-13.html#jls-13.4.4
>
>
> On Mon, Nov 26, 2018 at 9:54 PM Thomas Weise  wrote:
>
>> +1 for introducing the new interface now and deprecating the old one. The
>> major version change then provides the opportunity to remove deprecated
>> code.
>>
>>
>> On Mon, Nov 26, 2018 at 10:09 AM Lukasz Cwik  wrote:
>>
>>> Before 3.0 we will still want to introduce this giving time for people
>>> to migrate, would it make sense to do that now and deprecate the
>>> alternatives that it replaces?
>>>
>>> On Mon, Nov 26, 2018 at 5:59 AM Jeff Klukas  wrote:
>>>
 Picking up this thread again. Based on the feedback from Kenn, Reuven,
 and Romain, it sounds like there's no objection to the idea of
 SimpleFunction and SerializableFunction declaring that they throw
 Exception. So the discussion at this point is about whether there's an
 acceptable way to introduce that change.

 IIUC correctly, Kenn was suggesting that we need to ensure backwards
 compatibility for existing user code both at runtime and recompile, which
 means we can't simply add the declaration to the existing interfaces, since
 that would cause errors at compile time for user code directly invoking
 SerializableFunction instances.

 I don't see an obvious way that introducing a new functional interface
 would help without littering the API with more variants (it's already a bit
 confusing that i.e. MapElements has multiple via() methods to support three
 different function interfaces).

 Perhaps this kind of cleanup is best left for Beam 3.0?

 On Mon, Oct 15, 2018 at 6:51 PM Reuven Lax  wrote:

> Compilation compatibility is a big part of what Beam aims to provide
> with its guarantees. Romain makes a good point that most users are not
> invoking SeralizableFunctions themselves (they are usually invoked inside
> of Beam classes such as MapElements), however I suspect some users do 
> these
> things.
>
> On Sun, Oct 14, 2018 at 2:38 PM Kenneth Knowles 
> wrote:
>
>> Romain has brought up two good aspects of backwards compatibility:
>>
>>  - runtime replacement vs recompile
>>  - consumer (covariant) vs producer (contravariant, such as
>> interfaces a user implements)
>>
>> In this case, I think the best shoice is covariant and contravariant
>> (invariant) backwards compat including recompile compat. But we shouldn't
>> assume there is one obvious definition of "backwards compatibility".
>>
>> Does it help to introduce a new functional interface?
>>
>> Kenn
>>
>> On Sun, Oct 14, 2018 at 6:25 AM Romain Manni-Bucau <
>> rmannibu...@gmail.com> wrote:
>>
>>> Beam does not catch Exception for function usage so it will have to
>>> do it in some places.
>>>
>>> A user does not have to execute the function so worse case it
>>> impacts tests and in any case the most important: it does not impact the
>>> user until it recompiles the code (= runtime is not impacted).
>>>
>>> Romain Manni-Bucau
>>> @rmannibucau  |  Blog
>>>  | Old Blog
>>>  | Github
>>>  | LinkedIn
>>>  | Book
>>> 
>>>
>>>
>>> Le dim. 14 oct. 2018 à 15:19, Reuven Lax  a
>>> écrit :
>>>
 What in Beam codebase is not ready, and how do we know that user
 code doesn't have the same issue?

 On Sun, Oct 14, 2018 at 6:04 AM Romain Manni-Bucau <
 rmannibu...@gmail.com> wrote:

> Hmm, tested also and it works until something in the codeflow does
> not respect that constraint - see
> com.sun.tools.javac.comp.Flo

Re: Can we allow SimpleFunction and SerializableFunction to throw Exception?

2018-11-28 Thread Jeff Klukas
It looks like we can add make the new interface a superinterface for the
existing SerializableFunction while maintaining binary compatibility [0].

We'd have:

public interface NewSerializableFunction extends
Serializable {
  OutputT apply(InputT input) throws Exception;
}

and then modify SerializableFunction to inherit from it:

public interface SerializableFunction extends
NewSerializableFunction, Serializable {
  @Override
  OutputT apply(InputT input);
}


IIUC, we can then more or less replace all references to
SerializableFunction with NewSerializableFunction across the beam codebase
without having to introduce any new overrides. I'm working on a proof of
concept now.

It's not clear what the actual names for NewSerializableFunction and
NewSimpleFunction should be.

[0] https://docs.oracle.com/javase/specs/jls/se8/html/jls-13.html#jls-13.4.4


On Mon, Nov 26, 2018 at 9:54 PM Thomas Weise  wrote:

> +1 for introducing the new interface now and deprecating the old one. The
> major version change then provides the opportunity to remove deprecated
> code.
>
>
> On Mon, Nov 26, 2018 at 10:09 AM Lukasz Cwik  wrote:
>
>> Before 3.0 we will still want to introduce this giving time for people to
>> migrate, would it make sense to do that now and deprecate the alternatives
>> that it replaces?
>>
>> On Mon, Nov 26, 2018 at 5:59 AM Jeff Klukas  wrote:
>>
>>> Picking up this thread again. Based on the feedback from Kenn, Reuven,
>>> and Romain, it sounds like there's no objection to the idea of
>>> SimpleFunction and SerializableFunction declaring that they throw
>>> Exception. So the discussion at this point is about whether there's an
>>> acceptable way to introduce that change.
>>>
>>> IIUC correctly, Kenn was suggesting that we need to ensure backwards
>>> compatibility for existing user code both at runtime and recompile, which
>>> means we can't simply add the declaration to the existing interfaces, since
>>> that would cause errors at compile time for user code directly invoking
>>> SerializableFunction instances.
>>>
>>> I don't see an obvious way that introducing a new functional interface
>>> would help without littering the API with more variants (it's already a bit
>>> confusing that i.e. MapElements has multiple via() methods to support three
>>> different function interfaces).
>>>
>>> Perhaps this kind of cleanup is best left for Beam 3.0?
>>>
>>> On Mon, Oct 15, 2018 at 6:51 PM Reuven Lax  wrote:
>>>
 Compilation compatibility is a big part of what Beam aims to provide
 with its guarantees. Romain makes a good point that most users are not
 invoking SeralizableFunctions themselves (they are usually invoked inside
 of Beam classes such as MapElements), however I suspect some users do these
 things.

 On Sun, Oct 14, 2018 at 2:38 PM Kenneth Knowles 
 wrote:

> Romain has brought up two good aspects of backwards compatibility:
>
>  - runtime replacement vs recompile
>  - consumer (covariant) vs producer (contravariant, such as interfaces
> a user implements)
>
> In this case, I think the best shoice is covariant and contravariant
> (invariant) backwards compat including recompile compat. But we shouldn't
> assume there is one obvious definition of "backwards compatibility".
>
> Does it help to introduce a new functional interface?
>
> Kenn
>
> On Sun, Oct 14, 2018 at 6:25 AM Romain Manni-Bucau <
> rmannibu...@gmail.com> wrote:
>
>> Beam does not catch Exception for function usage so it will have to
>> do it in some places.
>>
>> A user does not have to execute the function so worse case it impacts
>> tests and in any case the most important: it does not impact the user 
>> until
>> it recompiles the code (= runtime is not impacted).
>>
>> Romain Manni-Bucau
>> @rmannibucau  |  Blog
>>  | Old Blog
>>  | Github
>>  | LinkedIn
>>  | Book
>> 
>>
>>
>> Le dim. 14 oct. 2018 à 15:19, Reuven Lax  a écrit :
>>
>>> What in Beam codebase is not ready, and how do we know that user
>>> code doesn't have the same issue?
>>>
>>> On Sun, Oct 14, 2018 at 6:04 AM Romain Manni-Bucau <
>>> rmannibu...@gmail.com> wrote:
>>>
 Hmm, tested also and it works until something in the codeflow does
 not respect that constraint - see
 com.sun.tools.javac.comp.Flow.FlowAnalyzer#errorUncaught. In other 
 words
 beam codebase is not ready for that and will make it fail but it is ok
 cause we can fix it but user code does not rely on that so it is fine 
 to
 update it normally.

 Romain Manni-Bucau

Re: Handling large values

2018-11-28 Thread Lukasz Cwik
I don't believe we would need to change any other coders since
SeekableInputStream wouldn't change how a regular InputStream would work so
coders that don't care about the implementation would still use it as a
forward only input stream. Coders that care about seeking would use the new
functionality.

For the encoding portion, the state backed length prefix coder would send
the small snippet of data that it received plus the state key without
invoking the component coder to encode the value. The downstream receiving
party would need to lookup the remote reference to get all the data. All
other coders would not be lazy and would have to encode the entire lazy
view, this could be done by optimized by copying the SeekableInputStream to
the OutputStream. Note that the length prefix coder is never used with IOs
and hence those IOs could be given a type like Iterable which is lazy,
but the encoding for that wouldn't be lazy and would output all the data
from the SeekableInputStream.


On Wed, Nov 28, 2018 at 3:08 PM Robert Bradshaw  wrote:

> On Wed, Nov 28, 2018 at 11:57 PM Lukasz Cwik  wrote:
> >
> > Re-adding +datapls-portability-t...@google.com +
> datapls-unified-wor...@google.com
> >
> > On Wed, Nov 28, 2018 at 2:23 PM Robert Bradshaw 
> wrote:
> >>
> >> Thanks for bringing this to the list. More below.
> >>
> >> On Wed, Nov 28, 2018 at 11:10 PM Kenneth Knowles 
> wrote:
> >>>
> >>> FWIW I deliberately limited the thread to not mix public and private
> lists, so people intending private replies do not accidentally send to
> dev@beam.
> >>>
> >>> I've left them on this time, to avoid contradicting your action, but I
> recommend removing them.
> >>>
> >>> Kenn
> >>>
> >>> On Wed, Nov 28, 2018 at 12:59 PM Lukasz Cwik  wrote:
> 
>  Re-adding +datapls-portability-t...@google.com +
> datapls-unified-wor...@google.com
> 
>  On Wed, Nov 28, 2018 at 12:58 PM Lukasz Cwik 
> wrote:
> >
> > That is correct Kenn. An important point would be that
> SomeOtherCoder would be given a seekable stream (instead of the forward
> only stream it gets right now) so it can either decode all the data or
> lazily decode parts as it needs to as in the case of an iterable coder when
> used to support large iterables coming out of a GroupByKey.
> >
> > On Wed, Nov 28, 2018 at 12:52 PM Kenneth Knowles 
> wrote:
> >>
> >> Interesting! Having large iterables within rows would be great for
> the interactions between SQL and the core SDK's schema/Row support, and we
> weren't sure how that could work, exactly.
> >>
> >> My (very basic) understanding would be that
> LengthPrefixedCoder(SomeOtherCoder) has an encoding that is a length
> followed by the encoding of SomeOtherCoder.
> >>
> >> So the new proposal would be that
> LengthPrefixedCoder(SomeOtherCoder) has an encoding where it has a length
> followed by some number of bytes and if it ends with a special token
> (ignoring escaping issues) then you have to gather bytes from more messages
> in order to assemble a stream to send to SomeOtherCoder? Have I got what
> you mean? So this is a different, yet compatible, approach to sending over
> a special token that has to be looked up separately via the state read API?
> >>
> >> Kenn
> >>
> >> On Wed, Nov 28, 2018 at 12:01 PM Lukasz Cwik 
> wrote:
> >>>
> >>> There is a discussion happening on a PR 7127[1] where Robert is
> working on providing the first implementation for supporting large
> iterables resulting from a GroupByKey. This is inline with the original
> proposal for remote references over the Fn Data & State API[2].
> >>>
> >>> I had thought about this issue more since the original write up
> was done over a year ago and believe that we can simplify the
> implementation by migrating the length prefix coder to be able to embed a
> remote reference token at the end of the stream if the data is too large.
> This allows any coder which supports lazy decoding to return a view over a
> seekable stream instead of decoding all the data (regardless whether all
> the data was sent or there is a state token representing the remote
> reference).
> >>>
> >>> Allowing any arbitrary coder to support lazy decoding helps solve
> the large iterable use case but also opens up the ability for types which
> don't need to be fully decoded to provide lazy views. Imagine our Beam rows
> using a format where only rows that are read are decoded while everything
> else is left in its encoded form.
> >>>
> >>> I also originally thought that this could also help solve an issue
> where large values[3] need to be chunked across multiple protobuf messages
> over the Data API which complicates the reading side decoding
> implementation since each SDK needs to provide an implementation that
> blocks and waits for the next chunk to come across for the same logical
> stream[4]. But there are issues with this because the runner may make a bad
> coder choice such as i

Question about checkpoint logic of the Dataflow Runner

2018-11-28 Thread flyisland
Hi Gurus,

I need to understand the checkpoint logic of the Dataflow Runner, like when
and how will the runner trigger a finalize on a checkpoint, is the finalize
thread same as the reader thread?

Could you share me the information, or point me to the related source code,
thanks in advance!

Island Chen


Re: Handling large values

2018-11-28 Thread Robert Bradshaw
On Wed, Nov 28, 2018 at 11:57 PM Lukasz Cwik  wrote:
>
> Re-adding +datapls-portability-t...@google.com 
> +datapls-unified-wor...@google.com
>
> On Wed, Nov 28, 2018 at 2:23 PM Robert Bradshaw  wrote:
>>
>> Thanks for bringing this to the list. More below.
>>
>> On Wed, Nov 28, 2018 at 11:10 PM Kenneth Knowles  wrote:
>>>
>>> FWIW I deliberately limited the thread to not mix public and private lists, 
>>> so people intending private replies do not accidentally send to dev@beam.
>>>
>>> I've left them on this time, to avoid contradicting your action, but I 
>>> recommend removing them.
>>>
>>> Kenn
>>>
>>> On Wed, Nov 28, 2018 at 12:59 PM Lukasz Cwik  wrote:

 Re-adding +datapls-portability-t...@google.com 
 +datapls-unified-wor...@google.com

 On Wed, Nov 28, 2018 at 12:58 PM Lukasz Cwik  wrote:
>
> That is correct Kenn. An important point would be that SomeOtherCoder 
> would be given a seekable stream (instead of the forward only stream it 
> gets right now) so it can either decode all the data or lazily decode 
> parts as it needs to as in the case of an iterable coder when used to 
> support large iterables coming out of a GroupByKey.
>
> On Wed, Nov 28, 2018 at 12:52 PM Kenneth Knowles  wrote:
>>
>> Interesting! Having large iterables within rows would be great for the 
>> interactions between SQL and the core SDK's schema/Row support, and we 
>> weren't sure how that could work, exactly.
>>
>> My (very basic) understanding would be that 
>> LengthPrefixedCoder(SomeOtherCoder) has an encoding that is a length 
>> followed by the encoding of SomeOtherCoder.
>>
>> So the new proposal would be that LengthPrefixedCoder(SomeOtherCoder) 
>> has an encoding where it has a length followed by some number of bytes 
>> and if it ends with a special token (ignoring escaping issues) then you 
>> have to gather bytes from more messages in order to assemble a stream to 
>> send to SomeOtherCoder? Have I got what you mean? So this is a 
>> different, yet compatible, approach to sending over a special token that 
>> has to be looked up separately via the state read API?
>>
>> Kenn
>>
>> On Wed, Nov 28, 2018 at 12:01 PM Lukasz Cwik  wrote:
>>>
>>> There is a discussion happening on a PR 7127[1] where Robert is working 
>>> on providing the first implementation for supporting large iterables 
>>> resulting from a GroupByKey. This is inline with the original proposal 
>>> for remote references over the Fn Data & State API[2].
>>>
>>> I had thought about this issue more since the original write up was 
>>> done over a year ago and believe that we can simplify the 
>>> implementation by migrating the length prefix coder to be able to embed 
>>> a remote reference token at the end of the stream if the data is too 
>>> large. This allows any coder which supports lazy decoding to return a 
>>> view over a seekable stream instead of decoding all the data 
>>> (regardless whether all the data was sent or there is a state token 
>>> representing the remote reference).
>>>
>>> Allowing any arbitrary coder to support lazy decoding helps solve the 
>>> large iterable use case but also opens up the ability for types which 
>>> don't need to be fully decoded to provide lazy views. Imagine our Beam 
>>> rows using a format where only rows that are read are decoded while 
>>> everything else is left in its encoded form.
>>>
>>> I also originally thought that this could also help solve an issue 
>>> where large values[3] need to be chunked across multiple protobuf 
>>> messages over the Data API which complicates the reading side decoding 
>>> implementation since each SDK needs to provide an implementation that 
>>> blocks and waits for the next chunk to come across for the same logical 
>>> stream[4]. But there are issues with this because the runner may make a 
>>> bad coder choice such as iterable> (instead of 
>>> length_prefix>) which can lead to > 2gb of state keys if 
>>> there are many many values.
>>
>>
>> Yes. I think this would need to be a separate coder than the length prefix 
>> coder.
>>
>>
>>>
>>> Robert, would implementing the length prefix coder being backed by 
>>> state + adding a lazy decoding method to the iterable coder be 
>>> significantly more complicated then what you are proposing right now?
>>
>>
>> Yes, chopping things up at arbitrary byte boundaries (rather than element 
>> boundaries) tends to be significantly more subtle and complex (based on my 
>> experience with the data plane API). It would also require new public APIs 
>> for Coders.
>
>
> After some further thought, I don't think we need to have a different API for 
> coders, its just that they get a different implementation for the inputstream 
> when decoding. So the logic would be

Re: Handling large values

2018-11-28 Thread Lukasz Cwik
Re-adding +datapls-portability-t...@google.com
 +datapls-unified-wor...@google.com


On Wed, Nov 28, 2018 at 2:23 PM Robert Bradshaw  wrote:

> Thanks for bringing this to the list. More below.
>
> On Wed, Nov 28, 2018 at 11:10 PM Kenneth Knowles  wrote:
>
>> FWIW I deliberately limited the thread to not mix public and private
>> lists, so people intending private replies do not accidentally send to
>> dev@beam.
>>
>> I've left them on this time, to avoid contradicting your action, but I
>> recommend removing them.
>>
>> Kenn
>>
>> On Wed, Nov 28, 2018 at 12:59 PM Lukasz Cwik  wrote:
>>
>>> Re-adding +datapls-portability-t...@google.com
>>>  +datapls-unified-wor...@google.com
>>> 
>>>
>>> On Wed, Nov 28, 2018 at 12:58 PM Lukasz Cwik  wrote:
>>>
 That is correct Kenn. An important point would be that SomeOtherCoder
 would be given a seekable stream (instead of the forward only stream it
 gets right now) so it can either decode all the data or lazily decode parts
 as it needs to as in the case of an iterable coder when used to support
 large iterables coming out of a GroupByKey.

 On Wed, Nov 28, 2018 at 12:52 PM Kenneth Knowles 
 wrote:

> Interesting! Having large iterables within rows would be great for the
> interactions between SQL and the core SDK's schema/Row support, and we
> weren't sure how that could work, exactly.
>
> My (very basic) understanding would be that
> LengthPrefixedCoder(SomeOtherCoder) has an encoding that is a length
> followed by the encoding of SomeOtherCoder.
>
> So the new proposal would be that LengthPrefixedCoder(SomeOtherCoder)
> has an encoding where it has a length followed by some number of bytes and
> if it ends with a special token (ignoring escaping issues) then you have 
> to
> gather bytes from more messages in order to assemble a stream to send to
> SomeOtherCoder? Have I got what you mean? So this is a different, yet
> compatible, approach to sending over a special token that has to be looked
> up separately via the state read API?
>
> Kenn
>
> On Wed, Nov 28, 2018 at 12:01 PM Lukasz Cwik  wrote:
>
>> There is a discussion happening on a PR 7127[1] where Robert is
>> working on providing the first implementation for supporting large
>> iterables resulting from a GroupByKey. This is inline with the original
>> proposal for remote references over the Fn Data & State API[2].
>>
>> I had thought about this issue more since the original write up was
>> done over a year ago and believe that we can simplify the implementation 
>> by
>> migrating the length prefix coder to be able to embed a remote reference
>> token at the end of the stream if the data is too large. This allows any
>> coder which supports lazy decoding to return a view over a seekable 
>> stream
>> instead of decoding all the data (regardless whether all the data was 
>> sent
>> or there is a state token representing the remote reference).
>>
>> Allowing any arbitrary coder to support lazy decoding helps solve the
>> large iterable use case but also opens up the ability for types which 
>> don't
>> need to be fully decoded to provide lazy views. Imagine our Beam rows 
>> using
>> a format where only rows that are read are decoded while everything else 
>> is
>> left in its encoded form.
>>
>> I also originally thought that this could also help solve an issue
>> where large values[3] need to be chunked across multiple protobuf 
>> messages
>> over the Data API which complicates the reading side decoding
>> implementation since each SDK needs to provide an implementation that
>> blocks and waits for the next chunk to come across for the same logical
>> stream[4]. But there are issues with this because the runner may make a 
>> bad
>> coder choice such as iterable> (instead
>> of length_prefix>) which can lead to > 2gb of state keys 
>> if
>> there are many many values.
>>
>
> Yes. I think this would need to be a separate coder than the length prefix
> coder.
>

>
Robert, would implementing the length prefix coder being backed by state +
>> adding a lazy decoding method to the iterable coder be significantly more
>> complicated then what you are proposing right now?
>>
>
> Yes, chopping things up at arbitrary byte boundaries (rather than element
> boundaries) tends to be significantly more subtle and complex (based on my
> experience with the data plane API). It would also require new public APIs
> for Coders.
>

After some further thought, I don't think we need to have a different API
for coders, its just that they get a different implementation for the
inputstream when decoding. So the logic would be:
public T decode(InputStream is) {
  if (is instanceof SeekableInputStream) {
return view((SeekableInputStream) is);
 

Re: Handling large values

2018-11-28 Thread Robert Bradshaw
Thanks for bringing this to the list. More below.

On Wed, Nov 28, 2018 at 11:10 PM Kenneth Knowles  wrote:

> FWIW I deliberately limited the thread to not mix public and private
> lists, so people intending private replies do not accidentally send to
> dev@beam.
>
> I've left them on this time, to avoid contradicting your action, but I
> recommend removing them.
>
> Kenn
>
> On Wed, Nov 28, 2018 at 12:59 PM Lukasz Cwik  wrote:
>
>> Re-adding +datapls-portability-t...@google.com
>>  +datapls-unified-wor...@google.com
>> 
>>
>> On Wed, Nov 28, 2018 at 12:58 PM Lukasz Cwik  wrote:
>>
>>> That is correct Kenn. An important point would be that SomeOtherCoder
>>> would be given a seekable stream (instead of the forward only stream it
>>> gets right now) so it can either decode all the data or lazily decode parts
>>> as it needs to as in the case of an iterable coder when used to support
>>> large iterables coming out of a GroupByKey.
>>>
>>> On Wed, Nov 28, 2018 at 12:52 PM Kenneth Knowles 
>>> wrote:
>>>
 Interesting! Having large iterables within rows would be great for the
 interactions between SQL and the core SDK's schema/Row support, and we
 weren't sure how that could work, exactly.

 My (very basic) understanding would be that
 LengthPrefixedCoder(SomeOtherCoder) has an encoding that is a length
 followed by the encoding of SomeOtherCoder.

 So the new proposal would be that LengthPrefixedCoder(SomeOtherCoder)
 has an encoding where it has a length followed by some number of bytes and
 if it ends with a special token (ignoring escaping issues) then you have to
 gather bytes from more messages in order to assemble a stream to send to
 SomeOtherCoder? Have I got what you mean? So this is a different, yet
 compatible, approach to sending over a special token that has to be looked
 up separately via the state read API?

 Kenn

 On Wed, Nov 28, 2018 at 12:01 PM Lukasz Cwik  wrote:

> There is a discussion happening on a PR 7127[1] where Robert is
> working on providing the first implementation for supporting large
> iterables resulting from a GroupByKey. This is inline with the original
> proposal for remote references over the Fn Data & State API[2].
>
> I had thought about this issue more since the original write up was
> done over a year ago and believe that we can simplify the implementation 
> by
> migrating the length prefix coder to be able to embed a remote reference
> token at the end of the stream if the data is too large. This allows any
> coder which supports lazy decoding to return a view over a seekable stream
> instead of decoding all the data (regardless whether all the data was sent
> or there is a state token representing the remote reference).
>
> Allowing any arbitrary coder to support lazy decoding helps solve the
> large iterable use case but also opens up the ability for types which 
> don't
> need to be fully decoded to provide lazy views. Imagine our Beam rows 
> using
> a format where only rows that are read are decoded while everything else 
> is
> left in its encoded form.
>
> I also originally thought that this could also help solve an issue
> where large values[3] need to be chunked across multiple protobuf messages
> over the Data API which complicates the reading side decoding
> implementation since each SDK needs to provide an implementation that
> blocks and waits for the next chunk to come across for the same logical
> stream[4]. But there are issues with this because the runner may make a 
> bad
> coder choice such as iterable> (instead
> of length_prefix>) which can lead to > 2gb of state keys if
> there are many many values.
>

Yes. I think this would need to be a separate coder than the length prefix
coder.


> Robert, would implementing the length prefix coder being backed by state +
> adding a lazy decoding method to the iterable coder be significantly more
> complicated then what you are proposing right now?
>

Yes, chopping things up at arbitrary byte boundaries (rather than element
boundaries) tends to be significantly more subtle and complex (based on my
experience with the data plane API). It would also require new public APIs
for Coders.

This is why I went with the more restricted (but still by far most common,
and quite straightforward) case of supporting arbitrarily large iterables
(which can still occur at any level of nesting, e.g. inside rows), leaving
the general case as future work.


> What do others think about coders supporting a "lazy" decode mode in
> coders?
>
> 1: https://github.com/apache/beam/pull/7127
> 2:
> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.y6e78jyiwn50
> 3:
> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI

Re: Handling large values

2018-11-28 Thread Kenneth Knowles
FWIW I deliberately limited the thread to not mix public and private lists,
so people intending private replies do not accidentally send to dev@beam.

I've left them on this time, to avoid contradicting your action, but I
recommend removing them.

Kenn

On Wed, Nov 28, 2018 at 12:59 PM Lukasz Cwik  wrote:

> Re-adding +datapls-portability-t...@google.com
>  +datapls-unified-wor...@google.com
> 
>
> On Wed, Nov 28, 2018 at 12:58 PM Lukasz Cwik  wrote:
>
>> That is correct Kenn. An important point would be that SomeOtherCoder
>> would be given a seekable stream (instead of the forward only stream it
>> gets right now) so it can either decode all the data or lazily decode parts
>> as it needs to as in the case of an iterable coder when used to support
>> large iterables coming out of a GroupByKey.
>>
>> On Wed, Nov 28, 2018 at 12:52 PM Kenneth Knowles  wrote:
>>
>>> Interesting! Having large iterables within rows would be great for the
>>> interactions between SQL and the core SDK's schema/Row support, and we
>>> weren't sure how that could work, exactly.
>>>
>>> My (very basic) understanding would be that
>>> LengthPrefixedCoder(SomeOtherCoder) has an encoding that is a length
>>> followed by the encoding of SomeOtherCoder.
>>>
>>> So the new proposal would be that LengthPrefixedCoder(SomeOtherCoder)
>>> has an encoding where it has a length followed by some number of bytes and
>>> if it ends with a special token (ignoring escaping issues) then you have to
>>> gather bytes from more messages in order to assemble a stream to send to
>>> SomeOtherCoder? Have I got what you mean? So this is a different, yet
>>> compatible, approach to sending over a special token that has to be looked
>>> up separately via the state read API?
>>>
>>> Kenn
>>>
>>> On Wed, Nov 28, 2018 at 12:01 PM Lukasz Cwik  wrote:
>>>
 There is a discussion happening on a PR 7127[1] where Robert is working
 on providing the first implementation for supporting large iterables
 resulting from a GroupByKey. This is inline with the original proposal for
 remote references over the Fn Data & State API[2].

 I had thought about this issue more since the original write up was
 done over a year ago and believe that we can simplify the implementation by
 migrating the length prefix coder to be able to embed a remote reference
 token at the end of the stream if the data is too large. This allows any
 coder which supports lazy decoding to return a view over a seekable stream
 instead of decoding all the data (regardless whether all the data was sent
 or there is a state token representing the remote reference).

 Allowing any arbitrary coder to support lazy decoding helps solve the
 large iterable use case but also opens up the ability for types which don't
 need to be fully decoded to provide lazy views. Imagine our Beam rows using
 a format where only rows that are read are decoded while everything else is
 left in its encoded form.

 I also originally thought that this could also help solve an issue
 where large values[3] need to be chunked across multiple protobuf messages
 over the Data API which complicates the reading side decoding
 implementation since each SDK needs to provide an implementation that
 blocks and waits for the next chunk to come across for the same logical
 stream[4]. But there are issues with this because the runner may make a bad
 coder choice such as iterable> (instead
 of length_prefix>) which can lead to > 2gb of state keys if
 there are many many values.

 Robert, would implementing the length prefix coder being backed by
 state + adding a lazy decoding method to the iterable coder be
 significantly more complicated then what you are proposing right now?

 What do others think about coders supporting a "lazy" decode mode in
 coders?

 1: https://github.com/apache/beam/pull/7127
 2:
 https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.y6e78jyiwn50
 3:
 https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.akxviyj4m0f0
 4:
 https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.u78ozd9rrlsf

>>>


Re: Handling large values

2018-11-28 Thread Lukasz Cwik
Re-adding +datapls-portability-t...@google.com
 +datapls-unified-wor...@google.com


On Wed, Nov 28, 2018 at 12:58 PM Lukasz Cwik  wrote:

> That is correct Kenn. An important point would be that SomeOtherCoder
> would be given a seekable stream (instead of the forward only stream it
> gets right now) so it can either decode all the data or lazily decode parts
> as it needs to as in the case of an iterable coder when used to support
> large iterables coming out of a GroupByKey.
>
> On Wed, Nov 28, 2018 at 12:52 PM Kenneth Knowles  wrote:
>
>> Interesting! Having large iterables within rows would be great for the
>> interactions between SQL and the core SDK's schema/Row support, and we
>> weren't sure how that could work, exactly.
>>
>> My (very basic) understanding would be that
>> LengthPrefixedCoder(SomeOtherCoder) has an encoding that is a length
>> followed by the encoding of SomeOtherCoder.
>>
>> So the new proposal would be that LengthPrefixedCoder(SomeOtherCoder) has
>> an encoding where it has a length followed by some number of bytes and if
>> it ends with a special token (ignoring escaping issues) then you have to
>> gather bytes from more messages in order to assemble a stream to send to
>> SomeOtherCoder? Have I got what you mean? So this is a different, yet
>> compatible, approach to sending over a special token that has to be looked
>> up separately via the state read API?
>>
>> Kenn
>>
>> On Wed, Nov 28, 2018 at 12:01 PM Lukasz Cwik  wrote:
>>
>>> There is a discussion happening on a PR 7127[1] where Robert is working
>>> on providing the first implementation for supporting large iterables
>>> resulting from a GroupByKey. This is inline with the original proposal for
>>> remote references over the Fn Data & State API[2].
>>>
>>> I had thought about this issue more since the original write up was done
>>> over a year ago and believe that we can simplify the implementation by
>>> migrating the length prefix coder to be able to embed a remote reference
>>> token at the end of the stream if the data is too large. This allows any
>>> coder which supports lazy decoding to return a view over a seekable stream
>>> instead of decoding all the data (regardless whether all the data was sent
>>> or there is a state token representing the remote reference).
>>>
>>> Allowing any arbitrary coder to support lazy decoding helps solve the
>>> large iterable use case but also opens up the ability for types which don't
>>> need to be fully decoded to provide lazy views. Imagine our Beam rows using
>>> a format where only rows that are read are decoded while everything else is
>>> left in its encoded form.
>>>
>>> I also originally thought that this could also help solve an issue where
>>> large values[3] need to be chunked across multiple protobuf messages over
>>> the Data API which complicates the reading side decoding implementation
>>> since each SDK needs to provide an implementation that blocks and waits for
>>> the next chunk to come across for the same logical stream[4]. But there are
>>> issues with this because the runner may make a bad coder choice such
>>> as iterable> (instead of length_prefix>)
>>> which can lead to > 2gb of state keys if there are many many values.
>>>
>>> Robert, would implementing the length prefix coder being backed by
>>> state + adding a lazy decoding method to the iterable coder be
>>> significantly more complicated then what you are proposing right now?
>>>
>>> What do others think about coders supporting a "lazy" decode mode in
>>> coders?
>>>
>>> 1: https://github.com/apache/beam/pull/7127
>>> 2:
>>> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.y6e78jyiwn50
>>> 3:
>>> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.akxviyj4m0f0
>>> 4:
>>> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.u78ozd9rrlsf
>>>
>>


Re: Handling large values

2018-11-28 Thread Lukasz Cwik
That is correct Kenn. An important point would be that SomeOtherCoder would
be given a seekable stream (instead of the forward only stream it gets
right now) so it can either decode all the data or lazily decode parts as
it needs to as in the case of an iterable coder when used to support large
iterables coming out of a GroupByKey.

On Wed, Nov 28, 2018 at 12:52 PM Kenneth Knowles  wrote:

> Interesting! Having large iterables within rows would be great for the
> interactions between SQL and the core SDK's schema/Row support, and we
> weren't sure how that could work, exactly.
>
> My (very basic) understanding would be that
> LengthPrefixedCoder(SomeOtherCoder) has an encoding that is a length
> followed by the encoding of SomeOtherCoder.
>
> So the new proposal would be that LengthPrefixedCoder(SomeOtherCoder) has
> an encoding where it has a length followed by some number of bytes and if
> it ends with a special token (ignoring escaping issues) then you have to
> gather bytes from more messages in order to assemble a stream to send to
> SomeOtherCoder? Have I got what you mean? So this is a different, yet
> compatible, approach to sending over a special token that has to be looked
> up separately via the state read API?
>
> Kenn
>
> On Wed, Nov 28, 2018 at 12:01 PM Lukasz Cwik  wrote:
>
>> There is a discussion happening on a PR 7127[1] where Robert is working
>> on providing the first implementation for supporting large iterables
>> resulting from a GroupByKey. This is inline with the original proposal for
>> remote references over the Fn Data & State API[2].
>>
>> I had thought about this issue more since the original write up was done
>> over a year ago and believe that we can simplify the implementation by
>> migrating the length prefix coder to be able to embed a remote reference
>> token at the end of the stream if the data is too large. This allows any
>> coder which supports lazy decoding to return a view over a seekable stream
>> instead of decoding all the data (regardless whether all the data was sent
>> or there is a state token representing the remote reference).
>>
>> Allowing any arbitrary coder to support lazy decoding helps solve the
>> large iterable use case but also opens up the ability for types which don't
>> need to be fully decoded to provide lazy views. Imagine our Beam rows using
>> a format where only rows that are read are decoded while everything else is
>> left in its encoded form.
>>
>> I also originally thought that this could also help solve an issue where
>> large values[3] need to be chunked across multiple protobuf messages over
>> the Data API which complicates the reading side decoding implementation
>> since each SDK needs to provide an implementation that blocks and waits for
>> the next chunk to come across for the same logical stream[4]. But there are
>> issues with this because the runner may make a bad coder choice such
>> as iterable> (instead of length_prefix>)
>> which can lead to > 2gb of state keys if there are many many values.
>>
>> Robert, would implementing the length prefix coder being backed by
>> state + adding a lazy decoding method to the iterable coder be
>> significantly more complicated then what you are proposing right now?
>>
>> What do others think about coders supporting a "lazy" decode mode in
>> coders?
>>
>> 1: https://github.com/apache/beam/pull/7127
>> 2:
>> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.y6e78jyiwn50
>> 3:
>> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.akxviyj4m0f0
>> 4:
>> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.u78ozd9rrlsf
>>
>


Re: Handling large values

2018-11-28 Thread Kenneth Knowles
Interesting! Having large iterables within rows would be great for the
interactions between SQL and the core SDK's schema/Row support, and we
weren't sure how that could work, exactly.

My (very basic) understanding would be that
LengthPrefixedCoder(SomeOtherCoder) has an encoding that is a length
followed by the encoding of SomeOtherCoder.

So the new proposal would be that LengthPrefixedCoder(SomeOtherCoder) has
an encoding where it has a length followed by some number of bytes and if
it ends with a special token (ignoring escaping issues) then you have to
gather bytes from more messages in order to assemble a stream to send to
SomeOtherCoder? Have I got what you mean? So this is a different, yet
compatible, approach to sending over a special token that has to be looked
up separately via the state read API?

Kenn

On Wed, Nov 28, 2018 at 12:01 PM Lukasz Cwik  wrote:

> There is a discussion happening on a PR 7127[1] where Robert is working on
> providing the first implementation for supporting large iterables resulting
> from a GroupByKey. This is inline with the original proposal for remote
> references over the Fn Data & State API[2].
>
> I had thought about this issue more since the original write up was done
> over a year ago and believe that we can simplify the implementation by
> migrating the length prefix coder to be able to embed a remote reference
> token at the end of the stream if the data is too large. This allows any
> coder which supports lazy decoding to return a view over a seekable stream
> instead of decoding all the data (regardless whether all the data was sent
> or there is a state token representing the remote reference).
>
> Allowing any arbitrary coder to support lazy decoding helps solve the
> large iterable use case but also opens up the ability for types which don't
> need to be fully decoded to provide lazy views. Imagine our Beam rows using
> a format where only rows that are read are decoded while everything else is
> left in its encoded form.
>
> I also originally thought that this could also help solve an issue where
> large values[3] need to be chunked across multiple protobuf messages over
> the Data API which complicates the reading side decoding implementation
> since each SDK needs to provide an implementation that blocks and waits for
> the next chunk to come across for the same logical stream[4]. But there are
> issues with this because the runner may make a bad coder choice such
> as iterable> (instead of length_prefix>)
> which can lead to > 2gb of state keys if there are many many values.
>
> Robert, would implementing the length prefix coder being backed by state +
> adding a lazy decoding method to the iterable coder be significantly more
> complicated then what you are proposing right now?
>
> What do others think about coders supporting a "lazy" decode mode in
> coders?
>
> 1: https://github.com/apache/beam/pull/7127
> 2:
> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.y6e78jyiwn50
> 3:
> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.akxviyj4m0f0
> 4:
> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.u78ozd9rrlsf
>


Handling large values

2018-11-28 Thread Lukasz Cwik
There is a discussion happening on a PR 7127[1] where Robert is working on
providing the first implementation for supporting large iterables resulting
from a GroupByKey. This is inline with the original proposal for remote
references over the Fn Data & State API[2].

I had thought about this issue more since the original write up was done
over a year ago and believe that we can simplify the implementation by
migrating the length prefix coder to be able to embed a remote reference
token at the end of the stream if the data is too large. This allows any
coder which supports lazy decoding to return a view over a seekable stream
instead of decoding all the data (regardless whether all the data was sent
or there is a state token representing the remote reference).

Allowing any arbitrary coder to support lazy decoding helps solve the large
iterable use case but also opens up the ability for types which don't need
to be fully decoded to provide lazy views. Imagine our Beam rows using a
format where only rows that are read are decoded while everything else is
left in its encoded form.

I also originally thought that this could also help solve an issue where
large values[3] need to be chunked across multiple protobuf messages over
the Data API which complicates the reading side decoding implementation
since each SDK needs to provide an implementation that blocks and waits for
the next chunk to come across for the same logical stream[4]. But there are
issues with this because the runner may make a bad coder choice such
as iterable> (instead of length_prefix>)
which can lead to > 2gb of state keys if there are many many values.

Robert, would implementing the length prefix coder being backed by state +
adding a lazy decoding method to the iterable coder be significantly more
complicated then what you are proposing right now?

What do others think about coders supporting a "lazy" decode mode in coders?

1: https://github.com/apache/beam/pull/7127
2:
https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.y6e78jyiwn50
3:
https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.akxviyj4m0f0
4:
https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.u78ozd9rrlsf


[ANNOUNCEMENT] [SQL] [BEAM-6133] Support for user defined table functions (UDTF)

2018-11-28 Thread Gleb Kanterov
At the moment we support only ScalarFunction UDF, it's functions that
operate on row fields. In Calcite, there are 3 kinds of UDFs: aggregate
functions (that we already support), table macro and table functions. The
difference between table functions and macros is that macros expand to
relations, and table functions can refer to anything queryable, e.g.,
enumerables. But in the case of Beam SQL, given everything translates to
PTransforms, only table macros are relevant.

UDTF are in a way similar to external tables but don't require to specify a
schema explicitly. Instead, they can derive schema based on arguments. One
of the use-cases would be querying ranges of dataset partitions using a
helper function like:

SELECT COUNT(*) FROM table(readAvro(id => 'dataset', start => '2017-01-01',
end => '2018-01-01'))

With BEAM-6133  (
apache/beam/#7141 ) we would have
support for UDTF in Beam SQL.

[1] https://issues.apache.org/jira/browse/BEAM-6133
[2] https://github.com/apache/beam/pull/7141

Gleb


Re: BigqueryIO field clustering

2018-11-28 Thread Chamikara Jayalath
Thanks for the contribution. I can take a look later this week.

On Wed, Nov 28, 2018 at 12:29 AM Wout Scheepers <
wout.scheep...@vente-exclusive.com> wrote:

> Hey all,
>
>
>
> Almost two weeks ago, I create a PR to support BigQuery clustering [1].
>
> Can someone please have a look?
>
>
>
> Thanks,
>
> Wout
>
>
>
> 1: https://github.com/apache/beam/pull/7061
>
>
>
>
>
> *From: *Lukasz Cwik 
> *Reply-To: *"u...@beam.apache.org" 
> *Date: *Wednesday, 29 August 2018 at 18:32
> *To: *dev , "u...@beam.apache.org" <
> u...@beam.apache.org>
> *Cc: *Bob De Schutter 
> *Subject: *Re: BigqueryIO field clustering
>
>
>
> +dev@beam.apache.org
>
>
>
> Wout, I assigned this task to you since it seems like your interested in
> contributing.
>
> The Apache Beam contribution guide[1] is a good place to start for
> answering questions on how to contribute.
>
>
>
> If you need help in getting stuff reviewed or having questions, feel free
> to reach out on dev@beam.apache.org or on Slack.
>
>
>
> 1: https://beam.apache.org/contribute/
>
>
>
>
>
> On Wed, Aug 29, 2018 at 1:28 AM Wout Scheepers <
> wout.scheep...@vente-exclusive.com> wrote:
>
> Hey all,
>
>
>
> I’m trying to use the field clustering beta feature in bigquery [1].
>
> However, the current Beam/dataflow worker bigquery api service dependency
> is ‘google-api-services-bigquery: com.google.apis: v2-rev374-1.23.0’, which
> does not include the clustering option in the TimePartitioning class.
>
> Hereby, I can’t specify the clustering field when loading/streaming into
> bigquery. See [2] for the bigquery api error details.
>
>
>
> Does anyone know a workaround for this?
>
>
>
> I guess that in the worst case I’ll have to wait until Beam supports a
> newer version of the bigquery api service.
>
> 1.After checking the Beam Jira I’ve found BEAM-5191
> . Is there any way I can
> help to push this forward and make this feature possible in the near future?
>
>
>
> Thanks in advance,
>
> Wout
>
>
>
> [1] https://cloud.google.com/bigquery/docs/clustered-tables
>
> [2] "errorResult" : {
>
>   "message" : "Incompatible table partitioning specification. Expects
> partitioning specification interval(type:day,field:publish_time)
> clustering(clustering_id), but input partitioning specification is
> interval(type:day,field:publish_time)",
>
>   "reason" : "invalid"
>
> }
>
>


Re: contributor in the Beam

2018-11-28 Thread Jean-Baptiste Onofré
Hi,

I already upgraded locally. Let me push the PR.

Regards
JB

On 28/11/2018 16:02, Chaim Turkel wrote:
> is there any reason that the mongo client version is still on 3.2.2?
> can you upgrade it to 3.9.0?
> chaim
> On Tue, Nov 27, 2018 at 4:48 PM Jean-Baptiste Onofré  
> wrote:
>>
>> Hi Chaim,
>>
>> The best is to create a Jira describing the new features you want to
>> add. Then, you can create a PR related to this Jira.
>>
>> As I'm the original MongoDbIO author, I would be more than happy to help
>> you and review the PR.
>>
>> Thanks !
>> Regards
>> JB
>>
>> On 27/11/2018 15:37, Chaim Turkel wrote:
>>> Hi,
>>>   I have added a few features to the MongoDbIO and would like to add
>>> them to the project.
>>> I have read https://beam.apache.org/contribute/
>>> I have added a jira user, what do i need to do next?
>>>
>>> chaim
>>>
>>
>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
> 

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Re: contributor in the Beam

2018-11-28 Thread Chaim Turkel
is there any reason that the mongo client version is still on 3.2.2?
can you upgrade it to 3.9.0?
chaim
On Tue, Nov 27, 2018 at 4:48 PM Jean-Baptiste Onofré  wrote:
>
> Hi Chaim,
>
> The best is to create a Jira describing the new features you want to
> add. Then, you can create a PR related to this Jira.
>
> As I'm the original MongoDbIO author, I would be more than happy to help
> you and review the PR.
>
> Thanks !
> Regards
> JB
>
> On 27/11/2018 15:37, Chaim Turkel wrote:
> > Hi,
> >   I have added a few features to the MongoDbIO and would like to add
> > them to the project.
> > I have read https://beam.apache.org/contribute/
> > I have added a jira user, what do i need to do next?
> >
> > chaim
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com

-- 


Loans are funded by
FinWise Bank, a Utah-chartered bank located in Sandy, 
Utah, member FDIC, Equal
Opportunity Lender. Merchant Cash Advances are 
made by Behalf. For more
information on ECOA, click here 
. For important information about 
opening a new
account, review Patriot Act procedures here 
.
Visit Legal 
 to
review our comprehensive program terms, 
conditions, and disclosures. 


Re: TextIO setting file dynamically issue

2018-11-28 Thread Jeff Klukas
You can likely achieve what you want using FileIO with dynamic
destinations, which is described in the "Advanced features" section of the
TextIO docs [0].

Your case might look something like:

 PCollection events = ...;
 events.apply(FileIO.writeDynamic()
   .by(event -> formatAsHHMMSS(event.timestamp))
   .via(TextIO.sink(), Event::toString)
   .to(timeString ->
nameFilesUsingWindowPaneAndShard("gs:///" + timeString +
"/Test", ".txt")));

This assumes the time you care about is part of the data structure you're
trying to write out. Per Reuven's point, if you wanted to use processing
time instead, your by() function could look more like your initial example:

   .by(event -> formatAsHHMMSS(new DateTime()))


[0]
https://beam.apache.org/releases/javadoc/2.8.0/index.html?org/apache/beam/sdk/io/TextIO.html#advanced-features

On Tue, Nov 27, 2018 at 6:48 PM Lukasz Cwik  wrote:

> +u...@beam.apache.org 
>
> On Mon, Nov 26, 2018 at 5:33 PM Reuven Lax  wrote:
>
>> Do you need it to change based on the timestamps of the records being
>> processed, or based on actual current time?
>>
>> On Mon, Nov 26, 2018 at 5:30 PM Matthew Schneid <
>> matthew.t.schn...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>>
>>>
>>> I have an interesting issue that I can’t seem to find a reliable
>>> resolution too.
>>>
>>>
>>>
>>> I have a standard TextIO output that looks like the following:
>>>
>>>
>>>
>>> TextIO.*write*().to("gs://+ new DateTime().toString("HH-mm-ss") 
>>> + "/Test-")
>>>
>>>
>>>
>>> The above works, and writes to GSC, as I expect it too.
>>>
>>>
>>>
>>> However, it retains the instantiated datetime value, and what I need to
>>> happen is for it to dynamically change with the current time.
>>>
>>>
>>>
>>> Is this possible?
>>>
>>>
>>>
>>> Thanks for any and all help that can be provided.
>>>
>>>
>>>
>>> V/R
>>>
>>>
>>>
>>> MS
>>>
>>


Re: contributor in the Beam

2018-11-28 Thread Chaim Turkel
i have created the pull request:
https://github.com/apache/beam/pull/7148
On Tue, Nov 27, 2018 at 4:48 PM Jean-Baptiste Onofré  wrote:
>
> Hi Chaim,
>
> The best is to create a Jira describing the new features you want to
> add. Then, you can create a PR related to this Jira.
>
> As I'm the original MongoDbIO author, I would be more than happy to help
> you and review the PR.
>
> Thanks !
> Regards
> JB
>
> On 27/11/2018 15:37, Chaim Turkel wrote:
> > Hi,
> >   I have added a few features to the MongoDbIO and would like to add
> > them to the project.
> > I have read https://beam.apache.org/contribute/
> > I have added a jira user, what do i need to do next?
> >
> > chaim
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com

-- 


Loans are funded by
FinWise Bank, a Utah-chartered bank located in Sandy, 
Utah, member FDIC, Equal
Opportunity Lender. Merchant Cash Advances are 
made by Behalf. For more
information on ECOA, click here 
. For important information about 
opening a new
account, review Patriot Act procedures here 
.
Visit Legal 
 to
review our comprehensive program terms, 
conditions, and disclosures. 


Re: [FEEDBACK REQUEST] Re: [ANNOUNCEMENT] Nexmark included to the CI

2018-11-28 Thread Etienne Chauchot
Hi Alex,Exporting results to the dashboards is as easy as writing to a BigQuery 
table and then configure the dashboard
SQL request to display it. Here is an example:- exporting: 
https://github.com/apache/beam/blob/ad150c1d654aac5720975727d8c6981c5382b449/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java#L163
- displaying:
SELECT  DATE(timestamp) as date,runtimeSecFROM  
[apache-beam-testing:nexmark.nexmark_0_DirectRunner_batch]WHERE 
timestamp >= TIMESTAMP_TO_SEC(DATE_ADD(CURRENT_TIMESTAMP(), -2, "WEEK")) ORDER 
BYdate;
BestEtienne
Le mardi 27 novembre 2018 à 17:34 -0800, Alex Amato a écrit :
> It would be great to add some lower level benchmark tests for the java SDK. I 
> was thinking of using open census for
> collecting benchmarks, which looks easy to use should be license compatible. 
> I'm just not sure about how to export the
> results so that we can display them on the perfkit dashboard for everyone to 
> see.
> 
> Is there an example PR for this part? Can we write to this data store for 
> this perfkit dashboard easily?
> 
> https://github.com/census-instrumentation/opencensus-java
> https://github.com/census-instrumentation/opencensus-java/tree/master/exporters/trace/zipkin#quickstart
> 
> 
> 
> 
> On Thu, Jul 19, 2018 at 1:28 PM Andrew Pilloud  wrote:
> > The doc changes look good to me, I'll add Dataflow once it is ready. Thanks 
> > for opening the issue on the
> > DirectRunner. I'll try to get some progress on a dedicated perf node while 
> > you are gone, we can talk about
> > increasing the size of the nexmark input collection for the runs once we 
> > know what the utilization on that looks
> > like.
> > Enjoy your time off!
> > 
> > Andrew
> > On Thu, Jul 19, 2018 at 9:00 AM Etienne Chauchot  
> > wrote:
> > > Hi guys,As suggested by Anton bellow, I opened a PR on the website to 
> > > reference the Nexmark dashboards. As I did
> > > not want users to take them for proper neutral benchmarks of the runners 
> > > / engines,  but more for a CI piece of
> > > software, I added a disclaimer.
> > > Please:- tell if you agree on  the publication of such performance 
> > > results- comment on the PR for the disclaimer.
> > > PR: https://github.com/apache/beam-site/pull/500
> > > 
> > > Thanks
> > > Etienne
> > > 
> > > Le jeudi 19 juillet 2018 à 12:30 +0200, Etienne Chauchot a écrit :
> > > > Hi Anton, 
> > > > Yes, good idea, I'll update nexmark website page
> > > > Etienne
> > > > Le mercredi 18 juillet 2018 à 10:17 -0700, Anton Kedin a écrit :
> > > > > These dashboards look great!
> > > > > 
> > > > > Can publish the links to the dashboards somewhere, for better 
> > > > > visibility? E.g. in the jenkins website /
> > > > > emails, or the wiki.
> > > > > 
> > > > > Regards,Anton
> > > > > On Wed, Jul 18, 2018 at 10:08 AM Andrew Pilloud  
> > > > > wrote:
> > > > > > Hi Etienne,
> > > > > > 
> > > > > > I've been asking around and it sounds like we should be able to get 
> > > > > > a dedicated Jenkins node for performance
> > > > > > tests. Another thing that might help is making the runs a few times 
> > > > > > longer. They are currently running
> > > > > > around 2 seconds each, so the total time of the build probably 
> > > > > > exceeds testing. Internally at Google we are
> > > > > > running them with 2000x as many events on Dataflow, but a job of 
> > > > > > that size won't even complete on the Direct
> > > > > > Runner.
> > > > > > I didn't see the query 3 issues, but now that you point it out it 
> > > > > > looks like a bug to me too.
> > > > > > 
> > > > > > Andrew
> > > > > > On Wed, Jul 18, 2018 at 1:13 AM Etienne Chauchot 
> > > > > >  wrote:
> > > > > > > Hi Andrew,
> > > > > > > Yes I saw that, except dedicating jenkins nodes to nexmark, I see 
> > > > > > > no other way.
> > > > > > > Also, did you see query 3 output size on direct runner? Should be 
> > > > > > > a straight line and it is not, I'm
> > > > > > > wondering if there is a problem with sate and timers impl in 
> > > > > > > direct runner.
> > > > > > > Etienne
> > > > > > > Le mardi 17 juillet 2018 à 11:38 -0700, Andrew Pilloud a écrit :
> > > > > > > > I'm noticing the graphs are really noisy. It looks like we are 
> > > > > > > > running these on shared Jenkins
> > > > > > > > executors, so our perf tests are fighting with other builds for 
> > > > > > > > CPU. I've opened an issue 
> > > > > > > > https://issues.apache.org/jira/browse/BEAM-4804 and am 
> > > > > > > > wondering if anyone knows an easy fix to isolate
> > > > > > > > these jobs.
> > > > > > > > Andrew
> > > > > > > > On Fri, Jul 13, 2018 at 2:39 AM Łukasz Gajowy 
> > > > > > > >  wrote:
> > > > > > > > > @Etienne: Nice to see the graphs! :)
> > > > > > > > > 
> > > > > > > > > @Ismael: Good idea, there's no document yet. I think we could 
> > > > > > > > > create a small google doc with
> > > > > > > > > instructions on how to do this.
> > > > > > > > > 
> > > > > > > > > pt., 13 lip 2018 o 10:46 Etien

Re: BigqueryIO field clustering

2018-11-28 Thread Wout Scheepers
Hey all,

Almost two weeks ago, I create a PR to support BigQuery clustering [1].
Can someone please have a look?

Thanks,
Wout

1: https://github.com/apache/beam/pull/7061


From: Lukasz Cwik 
Reply-To: "u...@beam.apache.org" 
Date: Wednesday, 29 August 2018 at 18:32
To: dev , "u...@beam.apache.org" 
Cc: Bob De Schutter 
Subject: Re: BigqueryIO field clustering

+dev@beam.apache.org

Wout, I assigned this task to you since it seems like your interested in 
contributing.
The Apache Beam contribution guide[1] is a good place to start for answering 
questions on how to contribute.

If you need help in getting stuff reviewed or having questions, feel free to 
reach out on dev@beam.apache.org or on Slack.

1: https://beam.apache.org/contribute/


On Wed, Aug 29, 2018 at 1:28 AM Wout Scheepers 
mailto:wout.scheep...@vente-exclusive.com>> 
wrote:
Hey all,

I’m trying to use the field clustering beta feature in bigquery [1].
However, the current Beam/dataflow worker bigquery api service dependency is 
‘google-api-services-bigquery: com.google.apis: v2-rev374-1.23.0’, which does 
not include the clustering option in the TimePartitioning class.
Hereby, I can’t specify the clustering field when loading/streaming into 
bigquery. See [2] for the bigquery api error details.

Does anyone know a workaround for this?

I guess that in the worst case I’ll have to wait until Beam supports a newer 
version of the bigquery api service.
1.After checking the Beam Jira I’ve found 
BEAM-5191. Is there any way I 
can help to push this forward and make this feature possible in the near future?

Thanks in advance,
Wout

[1] https://cloud.google.com/bigquery/docs/clustered-tables
[2] "errorResult" : {
  "message" : "Incompatible table partitioning specification. Expects 
partitioning specification interval(type:day,field:publish_time) 
clustering(clustering_id), but input partitioning specification is 
interval(type:day,field:publish_time)",
  "reason" : "invalid"
}