Re: [DISCUSS] Portability representation of schemas

2019-06-13 Thread Reuven Lax
As Luke mentioned above, we don't need to add a new mapping transform. We
can simply create a wrapping coder, that wraps the Java coder.

On Thu, Jun 13, 2019 at 4:32 PM Brian Hulette  wrote:

> Yes that's pretty much what I had in mind. The one point I'm unsure about
> is that I was thinking the *calling* SDK would need to insert the transform
> to convert to/from Rows (unless it's an SDK that uses the portable
> SchemaCoder everywhere and doesn't need a conversion). For example, python
> might do this in ExternalTransform's expand function [1]. I was thinking
> that an expansion service would only serve transforms that operate on
> PCollections with standard coders, so you wouldn't need a conversion there,
> but maybe I'm mistaken.
>
> Either way, you've captured the point: I think we could provide the
> niceties of the Java Schema API, without including anything SDK-specific in
> the portable representation of SchemaCoder, by having one JavaSchemaCoder
> and one PortableSchemaCoder that we can convert between transparent to the
> user.
>
> I put up a PR [2] that updates the Schema representation based on Kenn's
> "type-constructor based" alternative, and uses it in Java's
> SchemaTranslation. It doesn't actually touch any of the coders yet, they're
> all still just implemented as custom coders.
>
> [1]
> https://github.com/apache/beam/blob/4c322107ca5ebc0ab1cc6581d957501fd3ed9cc4/sdks/python/apache_beam/transforms/external.py#L44
> [2] https://github.com/apache/beam/pull/8853
>
> On Thu, Jun 13, 2019 at 11:42 AM Reuven Lax  wrote:
>
>> Spoke to Brian about his proposal. It is essentially this:
>>
>> We create PortableSchemaCoder, with a well-known URN. This coder is
>> parameterized by the schema (i.e. list of field name -> field type pairs).
>>
>> Java also continues to have its own CustomSchemaCoder. This is
>> parameterized by the schema as well as the to/from functions needed to make
>> the Java API "nice."
>>
>> When the expansion service expands a Java PTransform for usage across
>> languages, it will add a transform mapping the  PCollection with
>> CustomSchemaCoder to a PCollection which has PortableSchemaCoder. This way
>> Java can maintain the information needed to maintain its API (and Python
>> can do the same), but there's no need to shove this information into the
>> well-known portable representation.
>>
>> Brian, can you confirm that this was your proposal? If so, I like it.
>>
>> We've gone back and forth discussing abstracts for over a month now. I
>> suggest that the next step should be to create a PR, and move discussion to
>> that PR. Having actual code can often make discussion much more concrete.
>>
>> Reuven
>>
>> On Thu, Jun 13, 2019 at 6:28 AM Robert Bradshaw 
>> wrote:
>>
>>> On Thu, Jun 13, 2019 at 5:47 AM Reuven Lax  wrote:
>>>

 On Wed, Jun 12, 2019 at 8:29 PM Kenneth Knowles 
 wrote:

> Can we choose a first step? I feel there's consensus around:
>
>  - the basic idea of what a schema looks like, ignoring logical types
> or SDK-specific bits
>  - the version of logical type which is a standardized URN+payload
> plus a representation
>
> Perhaps we could commit this and see what it looks like to try to use
> it?
>

>>> +1
>>>
>>>
 It also seems like there might be consensus around the idea of each of:
>
>  - a coder that simply encodes rows; its payload is just a schema; it
> is minimalist, canonical
>
  - a coder that encodes a non-row using the serialization format of a
> row; this has to be a coder (versus Convert transforms) so that to/from 
> row
> conversions can be elided when primitives are fused (just like to/from
> bytes is elided)
>

>>> So, to make it concrete, in the Beam protos we would have an
>>> [Elementwise]SchemaCoder whose single parameterization would be FieldType,
>>> whose definition is in terms of URN + payload + components (+
>>> representation, for non-primitive types, some details TBD there). It could
>>> be deserialized into various different Coder instances (an SDK
>>> implementation detail) in an SDK depending on the type. One of the most
>>> important primitive field types is Row (aka Struct).
>>>
>>> We would define a byte encoding for each primitive type. We *could*
>>> choose to simply require that the encoding of any non-row primitive is the
>>> same as its encoding in a single-member row, but that's not necessary.
>>>
>>> In the short term, the window/timestamp/pane info would still live
>>> outside via an enclosing WindowCoder, as it does now, not blocking on a
>>> desirable but still-to-be-figured-out unification at that level.
>>>
>>> This seems like a good path forward.
>>>
>>> Actually this doesn't make sense to me. I think from the portability
 perspective, all we have is schemas - the rest is just a convenience for
 the SDK. As such, I don't think it makes sense at all to model this as a
 Coder.

>>>
>>> Coder

Re: [DISCUSS] Portability representation of schemas

2019-06-13 Thread Brian Hulette
Yes that's pretty much what I had in mind. The one point I'm unsure about
is that I was thinking the *calling* SDK would need to insert the transform
to convert to/from Rows (unless it's an SDK that uses the portable
SchemaCoder everywhere and doesn't need a conversion). For example, python
might do this in ExternalTransform's expand function [1]. I was thinking
that an expansion service would only serve transforms that operate on
PCollections with standard coders, so you wouldn't need a conversion there,
but maybe I'm mistaken.

Either way, you've captured the point: I think we could provide the
niceties of the Java Schema API, without including anything SDK-specific in
the portable representation of SchemaCoder, by having one JavaSchemaCoder
and one PortableSchemaCoder that we can convert between transparent to the
user.

I put up a PR [2] that updates the Schema representation based on Kenn's
"type-constructor based" alternative, and uses it in Java's
SchemaTranslation. It doesn't actually touch any of the coders yet, they're
all still just implemented as custom coders.

[1]
https://github.com/apache/beam/blob/4c322107ca5ebc0ab1cc6581d957501fd3ed9cc4/sdks/python/apache_beam/transforms/external.py#L44
[2] https://github.com/apache/beam/pull/8853

On Thu, Jun 13, 2019 at 11:42 AM Reuven Lax  wrote:

> Spoke to Brian about his proposal. It is essentially this:
>
> We create PortableSchemaCoder, with a well-known URN. This coder is
> parameterized by the schema (i.e. list of field name -> field type pairs).
>
> Java also continues to have its own CustomSchemaCoder. This is
> parameterized by the schema as well as the to/from functions needed to make
> the Java API "nice."
>
> When the expansion service expands a Java PTransform for usage across
> languages, it will add a transform mapping the  PCollection with
> CustomSchemaCoder to a PCollection which has PortableSchemaCoder. This way
> Java can maintain the information needed to maintain its API (and Python
> can do the same), but there's no need to shove this information into the
> well-known portable representation.
>
> Brian, can you confirm that this was your proposal? If so, I like it.
>
> We've gone back and forth discussing abstracts for over a month now. I
> suggest that the next step should be to create a PR, and move discussion to
> that PR. Having actual code can often make discussion much more concrete.
>
> Reuven
>
> On Thu, Jun 13, 2019 at 6:28 AM Robert Bradshaw 
> wrote:
>
>> On Thu, Jun 13, 2019 at 5:47 AM Reuven Lax  wrote:
>>
>>>
>>> On Wed, Jun 12, 2019 at 8:29 PM Kenneth Knowles  wrote:
>>>
 Can we choose a first step? I feel there's consensus around:

  - the basic idea of what a schema looks like, ignoring logical types
 or SDK-specific bits
  - the version of logical type which is a standardized URN+payload plus
 a representation

 Perhaps we could commit this and see what it looks like to try to use
 it?

>>>
>> +1
>>
>>
>>> It also seems like there might be consensus around the idea of each of:

  - a coder that simply encodes rows; its payload is just a schema; it
 is minimalist, canonical

>>>  - a coder that encodes a non-row using the serialization format of a
 row; this has to be a coder (versus Convert transforms) so that to/from row
 conversions can be elided when primitives are fused (just like to/from
 bytes is elided)

>>>
>> So, to make it concrete, in the Beam protos we would have an
>> [Elementwise]SchemaCoder whose single parameterization would be FieldType,
>> whose definition is in terms of URN + payload + components (+
>> representation, for non-primitive types, some details TBD there). It could
>> be deserialized into various different Coder instances (an SDK
>> implementation detail) in an SDK depending on the type. One of the most
>> important primitive field types is Row (aka Struct).
>>
>> We would define a byte encoding for each primitive type. We *could*
>> choose to simply require that the encoding of any non-row primitive is the
>> same as its encoding in a single-member row, but that's not necessary.
>>
>> In the short term, the window/timestamp/pane info would still live
>> outside via an enclosing WindowCoder, as it does now, not blocking on a
>> desirable but still-to-be-figured-out unification at that level.
>>
>> This seems like a good path forward.
>>
>> Actually this doesn't make sense to me. I think from the portability
>>> perspective, all we have is schemas - the rest is just a convenience for
>>> the SDK. As such, I don't think it makes sense at all to model this as a
>>> Coder.
>>>
>>
>> Coder and Schemas are mutually exclusive on PCollections, and completely
>> specify type information, so I think it makes sense to reuse this (as we're
>> currently doing) until we can get rid of coders altogether.
>>
>> (At execution time, we would generalize the notion of a coder to indicate
>> how *batches* of elements are encoded, no

Re: SparkRunner Combine.perKey performance

2019-06-13 Thread Jan Lukavský


On 6/13/19 6:10 PM, Robert Bradshaw wrote:
On Thu, Jun 13, 2019 at 5:28 PM Jan Lukavský > wrote:


On 6/13/19 4:31 PM, Robert Bradshaw wrote:

The comment fails to take into account the asymmetry between
calling addInput vs. mergeAccumulators. It also focuses a lot on
the asymptotic behavior, when the most common behavior is likely
having a single (global) window.


Yes, occurred to me too. There are more questions here:

 a) it would help if WindowedValue#explodeWindows would return
List instead of Iterable, because then optimizations would be
possible based on number of windows (e.g. when there is only
single window, there is no need to sort anything). This should be
simple change, as it already is a List.

Well, I'm wary of this change, but we could always create a list out 
of it (via cast or ImmutableList.copyOf) if we needed.
Why so? I thought this would be the least objectionable change, because 
it actually *is* a List, and there is no interface, it is just a public 
method, that needs to be changed and state the fact correctly. A 
Collection would be the same. Iterable is for cases, where you don't 
exactly know if the data is stored in memory, or loaded from somewhere 
else and whence the size cannot be determined in advance. This is not 
the case for WindowFn.


 b) I'm a little confused why it is better to keep key with all
its windows in single WindowedValue in general. My first thoughts
would really be - explode all windows to (key, window) pairs, and
use these as keys as much as you can - there is obvious small
drawback, this might be less efficient for windowing strategies
with high number of windows per element (sliding windows with
small slide compared to window length). But that could be added to
the WindowFn and decision could be made accordingly.

The optimization is for sliding windows, where it's more efficient to 
send the value with all its windows and explode after the shuffle 
rather than duplicate the value before. Of course this breaks that 
ability with the hopes of being able to reduce the size more by doing 
combining. (A half-way approach would be to group on unexploded window 
set, which sounds worse generically but isn't bad in practice.)
I'd say, that adding something like WindowFn.isAssigningToSingleWindow() 
would solve all the nuances.



Were I to implement this I would let the accumulator be a hashmap
Window -> Value/Timestamp. For the non-merging, when a
WindowedValue with N windows comes in, simply do O(N)
lookup+addInput calls. Merging these accumulator hashmaps is
pretty easy. For merging windows, I would first invoke the
WindowFn to determine which old windows + new windows merged into
bigger windows, and then construct the values of the bigger
windows with the appropriate createAccumulator, addInput, and/or
mergeAccumulators calls, depending on how many old vs. new values
there are. This is a merging call + O(N) additions.

Yep, that sounds like it could work. For single window (global,
tumbling) the approach above would still be probably more efficient.


Yes, for the global window one could do away with the (single-valued) 
hashmap. Sessions.mergeWindows() does the tumbling to figure out which 
windows to merge, so that'd be just as efficient.

+1




BTW, the code is broken because it hard codes SessionWindows
rather than calling WindowFn.mergeWindows(...). This is a
correctness, not just a performance, bug :(.

Can you point me out in the code?


E.g. 
https://github.com/apache/beam/blob/release-2.13.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java#L106 always 
merges things that are intersecting, rather than 
querying WindowFn.mergeWindows to determine which, if any, should be 
merged.



On Thu, Jun 13, 2019 at 3:56 PM Jan Lukavský mailto:je...@seznam.cz>> wrote:

Hi Robert,

there is a comment around that which states, that the current
solution should be more efficient. I'd say, that (for
non-merging windows) it would be best to first explode
windows, and only after that do combineByKey(key & window).
Merging windows would have to be handled the way it is, or
maybe it would be better to split this to

 1) assign windows to elements

 2) combineByKeyAndWindow

Jan

On 6/13/19 3:51 PM, Robert Bradshaw wrote:

I think the problem is that it never leverages the
(traditionally much cheaper)
CombineFn.addInput(old_accumulator, new_value). Instead, it
always calls CombineFn.mergeAccumulators(old_accumulator,
CombineFn.addInput(CombineFn.createAccumulator(),
new_value)). It should be feasible to fix this while still
handling windowing correctly. (The end-of-window timestamp
combiner could also be o

Re: [DISCUSS] Portability representation of schemas

2019-06-13 Thread Reuven Lax
Spoke to Brian about his proposal. It is essentially this:

We create PortableSchemaCoder, with a well-known URN. This coder is
parameterized by the schema (i.e. list of field name -> field type pairs).

Java also continues to have its own CustomSchemaCoder. This is
parameterized by the schema as well as the to/from functions needed to make
the Java API "nice."

When the expansion service expands a Java PTransform for usage across
languages, it will add a transform mapping the  PCollection with
CustomSchemaCoder to a PCollection which has PortableSchemaCoder. This way
Java can maintain the information needed to maintain its API (and Python
can do the same), but there's no need to shove this information into the
well-known portable representation.

Brian, can you confirm that this was your proposal? If so, I like it.

We've gone back and forth discussing abstracts for over a month now. I
suggest that the next step should be to create a PR, and move discussion to
that PR. Having actual code can often make discussion much more concrete.

Reuven

On Thu, Jun 13, 2019 at 6:28 AM Robert Bradshaw  wrote:

> On Thu, Jun 13, 2019 at 5:47 AM Reuven Lax  wrote:
>
>>
>> On Wed, Jun 12, 2019 at 8:29 PM Kenneth Knowles  wrote:
>>
>>> Can we choose a first step? I feel there's consensus around:
>>>
>>>  - the basic idea of what a schema looks like, ignoring logical types or
>>> SDK-specific bits
>>>  - the version of logical type which is a standardized URN+payload plus
>>> a representation
>>>
>>> Perhaps we could commit this and see what it looks like to try to use it?
>>>
>>
> +1
>
>
>> It also seems like there might be consensus around the idea of each of:
>>>
>>>  - a coder that simply encodes rows; its payload is just a schema; it is
>>> minimalist, canonical
>>>
>>  - a coder that encodes a non-row using the serialization format of a
>>> row; this has to be a coder (versus Convert transforms) so that to/from row
>>> conversions can be elided when primitives are fused (just like to/from
>>> bytes is elided)
>>>
>>
> So, to make it concrete, in the Beam protos we would have an
> [Elementwise]SchemaCoder whose single parameterization would be FieldType,
> whose definition is in terms of URN + payload + components (+
> representation, for non-primitive types, some details TBD there). It could
> be deserialized into various different Coder instances (an SDK
> implementation detail) in an SDK depending on the type. One of the most
> important primitive field types is Row (aka Struct).
>
> We would define a byte encoding for each primitive type. We *could* choose
> to simply require that the encoding of any non-row primitive is the same as
> its encoding in a single-member row, but that's not necessary.
>
> In the short term, the window/timestamp/pane info would still live outside
> via an enclosing WindowCoder, as it does now, not blocking on a desirable
> but still-to-be-figured-out unification at that level.
>
> This seems like a good path forward.
>
> Actually this doesn't make sense to me. I think from the portability
>> perspective, all we have is schemas - the rest is just a convenience for
>> the SDK. As such, I don't think it makes sense at all to model this as a
>> Coder.
>>
>
> Coder and Schemas are mutually exclusive on PCollections, and completely
> specify type information, so I think it makes sense to reuse this (as we're
> currently doing) until we can get rid of coders altogether.
>
> (At execution time, we would generalize the notion of a coder to indicate
> how *batches* of elements are encoded, not just how individual elements are
> encoded. Here we have the option of letting the runner pick depending on
> the use (e.g. elementwise for key lookups vs. arrow for bulk data channel
> transfer vs ???, possibly with parameters like "preferred batch size") or
> standardizing on one physical byte representation for all communication
> over the boundary.)
>
>
>>
>>
>>>
>>> Can we also just have both of these, with different URNs?
>>>
>>> Kenn
>>>
>>> On Wed, Jun 12, 2019 at 3:57 PM Reuven Lax  wrote:
>>>


 On Wed, Jun 12, 2019 at 3:46 PM Robert Bradshaw 
 wrote:

> On Tue, Jun 11, 2019 at 8:04 PM Kenneth Knowles 
> wrote:
>
>>
>> I believe the schema registry is a transient construction-time
>> concept. I don't think there's any need for a concept of a registry in 
>> the
>> portable representation.
>>
>> I'd rather urn:beam:schema:logicaltype:javasdk not be used whenever
>>> one has (say) a Java POJO as that would prevent other SDKs from
>>> "understanding" it as above (unless we had a way of declaring it as 
>>> "just
>>> an alias/wrapper").
>>>
>>
>> I didn't understand the example I snipped, but I think I understand
>> your concern here. Is this what you want? (a) something presented as a 
>> POJO
>> in Java (b) encoded to a row, but still decoded to the POJO and (c)
>> non-Java SDK knows that it is "j

Re: SparkRunner Combine.perKey performance

2019-06-13 Thread Robert Bradshaw
On Thu, Jun 13, 2019 at 5:28 PM Jan Lukavský  wrote:

> On 6/13/19 4:31 PM, Robert Bradshaw wrote:
>
> The comment fails to take into account the asymmetry between calling
> addInput vs. mergeAccumulators. It also focuses a lot on the asymptotic
> behavior, when the most common behavior is likely having a single (global)
> window.
>
> Yes, occurred to me too. There are more questions here:
>
>  a) it would help if WindowedValue#explodeWindows would return List
> instead of Iterable, because then optimizations would be possible based on
> number of windows (e.g. when there is only single window, there is no need
> to sort anything). This should be simple change, as it already is a List.
>
Well, I'm wary of this change, but we could always create a list out of it
(via cast or ImmutableList.copyOf) if we needed.

>  b) I'm a little confused why it is better to keep key with all its
> windows in single WindowedValue in general. My first thoughts would really
> be - explode all windows to (key, window) pairs, and use these as keys as
> much as you can - there is obvious small drawback, this might be less
> efficient for windowing strategies with high number of windows per element
> (sliding windows with small slide compared to window length). But that
> could be added to the WindowFn and decision could be made accordingly.
>
The optimization is for sliding windows, where it's more efficient to send
the value with all its windows and explode after the shuffle rather than
duplicate the value before. Of course this breaks that ability with the
hopes of being able to reduce the size more by doing combining. (A half-way
approach would be to group on unexploded window set, which sounds worse
generically but isn't bad in practice.)

> Were I to implement this I would let the accumulator be a hashmap Window
> -> Value/Timestamp. For the non-merging, when a WindowedValue with N
> windows comes in, simply do O(N) lookup+addInput calls. Merging these
> accumulator hashmaps is pretty easy. For merging windows, I would first
> invoke the WindowFn to determine which old windows + new windows merged
> into bigger windows, and then construct the values of the bigger windows
> with the appropriate createAccumulator, addInput, and/or mergeAccumulators
> calls, depending on how many old vs. new values there are. This is a
> merging call + O(N) additions.
>
> Yep, that sounds like it could work. For single window (global, tumbling)
> the approach above would still be probably more efficient.
>

Yes, for the global window one could do away with the (single-valued)
hashmap. Sessions.mergeWindows() does the tumbling to figure out which
windows to merge, so that'd be just as efficient.

>
> BTW, the code is broken because it hard codes SessionWindows rather than
> calling WindowFn.mergeWindows(...). This is a correctness, not just a
> performance, bug :(.
>
> Can you point me out in the code?
>

E.g.
https://github.com/apache/beam/blob/release-2.13.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java#L106
always
merges things that are intersecting, rather than
querying WindowFn.mergeWindows to determine which, if any, should be
merged.

On Thu, Jun 13, 2019 at 3:56 PM Jan Lukavský  wrote:
>
>> Hi Robert,
>>
>> there is a comment around that which states, that the current solution
>> should be more efficient. I'd say, that (for non-merging windows) it would
>> be best to first explode windows, and only after that do combineByKey(key &
>> window). Merging windows would have to be handled the way it is, or maybe
>> it would be better to split this to
>>
>>  1) assign windows to elements
>>
>>  2) combineByKeyAndWindow
>>
>> Jan
>> On 6/13/19 3:51 PM, Robert Bradshaw wrote:
>>
>> I think the problem is that it never leverages the (traditionally much
>> cheaper) CombineFn.addInput(old_accumulator, new_value). Instead, it always
>> calls CombineFn.mergeAccumulators(old_accumulator,
>> CombineFn.addInput(CombineFn.createAccumulator(), new_value)). It should be
>> feasible to fix this while still handling windowing correctly. (The
>> end-of-window timestamp combiner could also be optimized because the
>> timestamp need not be tracked throughout in that case.)
>>
>> On the other hand, once we move everything to portability, it's we'll
>> probably toss all this code that use Spark's combiner lifting (instead
>> using the GroupingCombiningTable that's implemented naively in Beam, as we
>> do for Python to avoid fusion breaks).
>>
>> On Thu, Jun 13, 2019 at 3:20 PM Jan Lukavský  wrote:
>>
>>> Hi,
>>>
>>> I have hit a performance issue with Spark runner, that seems to related
>>> to its current Combine.perKey implementation. I'll try to summarize what
>>> I have found in the code:
>>>
>>>   - Combine.perKey uses Spark's combineByKey primitive, which is pretty
>>> similar to the definition of CombineFn
>>>
>>>   - it holds all elements as WindowedValues, and uses
>>> Iterable> as accumulator (each Windowe

Re: Jira permissions

2019-06-13 Thread Lukasz Cwik
Welcome, I have added you as a contributor and assigned BEAM-7542 to you.

On Wed, Jun 12, 2019 at 9:18 PM Viktor Gerdin  wrote:

>
>Hello
>
>My name it Viktor
>
>I've encountered an issue (BEAM-7542
>) and would like to
>contribute a solution. I'd like to be added as a Jira contributor so
>that I can assign issues to myself. My ASF Jira Username is viktor.gerdin.
>
>Regards
>
>
>
>
>
>--
>
>
>V
>
>
>


Re: SparkRunner Combine.perKey performance

2019-06-13 Thread Jan Lukavský

On 6/13/19 4:31 PM, Robert Bradshaw wrote:
The comment fails to take into account the asymmetry between calling 
addInput vs. mergeAccumulators. It also focuses a lot on the 
asymptotic behavior, when the most common behavior is likely having a 
single (global) window.


Yes, occurred to me too. There are more questions here:

 a) it would help if WindowedValue#explodeWindows would return List 
instead of Iterable, because then optimizations would be possible based 
on number of windows (e.g. when there is only single window, there is no 
need to sort anything). This should be simple change, as it already is a 
List.


 b) I'm a little confused why it is better to keep key with all its 
windows in single WindowedValue in general. My first thoughts would 
really be - explode all windows to (key, window) pairs, and use these as 
keys as much as you can - there is obvious small drawback, this might be 
less efficient for windowing strategies with high number of windows per 
element (sliding windows with small slide compared to window length). 
But that could be added to the WindowFn and decision could be made 
accordingly.




Were I to implement this I would let the accumulator be a hashmap 
Window -> Value/Timestamp. For the non-merging, when a WindowedValue 
with N windows comes in, simply do O(N) lookup+addInput calls. 
Merging these accumulator hashmaps is pretty easy. For merging 
windows, I would first invoke the WindowFn to determine which old 
windows + new windows merged into bigger windows, and then construct 
the values of the bigger windows with the appropriate 
createAccumulator, addInput, and/or mergeAccumulators calls, depending 
on how many old vs. new values there are. This is a merging call + 
O(N) additions.
Yep, that sounds like it could work. For single window (global, 
tumbling) the approach above would still be probably more efficient.


BTW, the code is broken because it hard codes SessionWindows rather 
than calling WindowFn.mergeWindows(...). This is a correctness, not 
just a performance, bug :(.

Can you point me out in the code?




On Thu, Jun 13, 2019 at 3:56 PM Jan Lukavský > wrote:


Hi Robert,

there is a comment around that which states, that the current
solution should be more efficient. I'd say, that (for non-merging
windows) it would be best to first explode windows, and only after
that do combineByKey(key & window). Merging windows would have to
be handled the way it is, or maybe it would be better to split this to

 1) assign windows to elements

 2) combineByKeyAndWindow

Jan

On 6/13/19 3:51 PM, Robert Bradshaw wrote:

I think the problem is that it never leverages the (traditionally
much cheaper) CombineFn.addInput(old_accumulator, new_value).
Instead, it always calls
CombineFn.mergeAccumulators(old_accumulator,
CombineFn.addInput(CombineFn.createAccumulator(), new_value)). It
should be feasible to fix this while still handling windowing
correctly. (The end-of-window timestamp combiner could also be
optimized because the timestamp need not be tracked throughout in
that case.)

On the other hand, once we move everything to portability, it's
we'll probably toss all this code that use Spark's combiner
lifting (instead using the GroupingCombiningTable that's
implemented naively in Beam, as we do for Python to avoid fusion
breaks).

On Thu, Jun 13, 2019 at 3:20 PM Jan Lukavský mailto:je...@seznam.cz>> wrote:

Hi,

I have hit a performance issue with Spark runner, that seems
to related
to its current Combine.perKey implementation. I'll try to
summarize what
I have found in the code:

  - Combine.perKey uses Spark's combineByKey primitive, which
is pretty
similar to the definition of CombineFn

  - it holds all elements as WindowedValues, and uses
Iterable> as accumulator (each
WindowedValue holds
accumulated state for each window)

  - the update function is implemented as

   1) convert value to Iterable>

   2) merge accumulators for each windows

The logic inside createAccumulator and mergeAccumulators is
quite
non-trivial. The result of profiling is that two frames where
the code
spends most of the time are:

  41633930798   33.18% 4163

org.apache.beam.runners.spark.translation.SparkKeyedCombineFn.mergeCombiners
  19990682441   15.93% 1999

org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.unmodifiableIterable

A simple change on code from

  PCollection<..> input = ...

  input.apply(Combine.perKey(...))

to

  PCollection<..> input = ...

  input

    .apply(GroupByKey.create())

    .apply(Combine.groupedValues(...))

had drastical impact on the 

Re: DRAFT - Apache Beam Board Report - June '19

2019-06-13 Thread Kenneth Knowles
Thanks for seeding it, Luke! I added the autogenerated info from
reporter.apache.org and the boilerplate description, etc.

The main section to fill out is "Activity" where we highlight events and
milestones in Beam's development. Anyone who has something big from the
last 3 months, please suggest. We need to keep the report short, so just
one bullet for very significant things. You can see past reports for the
idea: https://whimsy.apache.org/board/minutes/Beam.html

Kenn

On Thu, Jun 13, 2019 at 7:47 AM Lukasz Cwik  wrote:

> Hi all
>
> Our next project report to the ASF Board of Directors is due June 14th.
> I've seeded a draft here:
> https://docs.google.com/document/d/1GY16lzVKL-mPh4M560AtqPAB1kXEptkhcBymvFr-4z8/edit?usp=sharing
>
> Please help to eliminate all the TODOs by adding suggestions.
>
> Luke
>


DRAFT - Apache Beam Board Report - June '19

2019-06-13 Thread Lukasz Cwik
Hi all

Our next project report to the ASF Board of Directors is due June 14th.
I've seeded a draft here:
https://docs.google.com/document/d/1GY16lzVKL-mPh4M560AtqPAB1kXEptkhcBymvFr-4z8/edit?usp=sharing

Please help to eliminate all the TODOs by adding suggestions.

Luke


Re: SparkRunner Combine.perKey performance

2019-06-13 Thread Robert Bradshaw
The comment fails to take into account the asymmetry between calling
addInput vs. mergeAccumulators. It also focuses a lot on the asymptotic
behavior, when the most common behavior is likely having a single (global)
window.

Were I to implement this I would let the accumulator be a hashmap Window ->
Value/Timestamp. For the non-merging, when a WindowedValue with N windows
comes in, simply do O(N) lookup+addInput calls. Merging these accumulator
hashmaps is pretty easy. For merging windows, I would first invoke the
WindowFn to determine which old windows + new windows merged into bigger
windows, and then construct the values of the bigger windows with the
appropriate createAccumulator, addInput, and/or mergeAccumulators calls,
depending on how many old vs. new values there are. This is a merging call
+ O(N) additions.

BTW, the code is broken because it hard codes SessionWindows rather than
calling WindowFn.mergeWindows(...). This is a correctness, not just a
performance, bug :(.



On Thu, Jun 13, 2019 at 3:56 PM Jan Lukavský  wrote:

> Hi Robert,
>
> there is a comment around that which states, that the current solution
> should be more efficient. I'd say, that (for non-merging windows) it would
> be best to first explode windows, and only after that do combineByKey(key &
> window). Merging windows would have to be handled the way it is, or maybe
> it would be better to split this to
>
>  1) assign windows to elements
>
>  2) combineByKeyAndWindow
>
> Jan
> On 6/13/19 3:51 PM, Robert Bradshaw wrote:
>
> I think the problem is that it never leverages the (traditionally much
> cheaper) CombineFn.addInput(old_accumulator, new_value). Instead, it always
> calls CombineFn.mergeAccumulators(old_accumulator,
> CombineFn.addInput(CombineFn.createAccumulator(), new_value)). It should be
> feasible to fix this while still handling windowing correctly. (The
> end-of-window timestamp combiner could also be optimized because the
> timestamp need not be tracked throughout in that case.)
>
> On the other hand, once we move everything to portability, it's we'll
> probably toss all this code that use Spark's combiner lifting (instead
> using the GroupingCombiningTable that's implemented naively in Beam, as we
> do for Python to avoid fusion breaks).
>
> On Thu, Jun 13, 2019 at 3:20 PM Jan Lukavský  wrote:
>
>> Hi,
>>
>> I have hit a performance issue with Spark runner, that seems to related
>> to its current Combine.perKey implementation. I'll try to summarize what
>> I have found in the code:
>>
>>   - Combine.perKey uses Spark's combineByKey primitive, which is pretty
>> similar to the definition of CombineFn
>>
>>   - it holds all elements as WindowedValues, and uses
>> Iterable> as accumulator (each WindowedValue holds
>> accumulated state for each window)
>>
>>   - the update function is implemented as
>>
>>1) convert value to Iterable>
>>
>>2) merge accumulators for each windows
>>
>> The logic inside createAccumulator and mergeAccumulators is quite
>> non-trivial. The result of profiling is that two frames where the code
>> spends most of the time are:
>>
>>   41633930798   33.18% 4163
>>
>> org.apache.beam.runners.spark.translation.SparkKeyedCombineFn.mergeCombiners
>>   19990682441   15.93% 1999
>>
>> org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.unmodifiableIterable
>>
>> A simple change on code from
>>
>>   PCollection<..> input = ...
>>
>>   input.apply(Combine.perKey(...))
>>
>> to
>>
>>   PCollection<..> input = ...
>>
>>   input
>>
>> .apply(GroupByKey.create())
>>
>> .apply(Combine.groupedValues(...))
>>
>> had drastical impact on the job run time (minutes as opposed to hours,
>> after which the first job didn't even finish!).
>>
>> I think I understand the reason why the current logic is implemented as
>> it is, it has to deal with merging windows. But the consequences seem to
>> be that it renders the implementation very inefficient.
>>
>> Has anyone seen similar behavior? Does my analysis of the problem seem
>> correct?
>>
>> Jan
>>
>>
>>


Re: SparkRunner Combine.perKey performance

2019-06-13 Thread Jan Lukavský

Hi Robert,

there is a comment around that which states, that the current solution 
should be more efficient. I'd say, that (for non-merging windows) it 
would be best to first explode windows, and only after that do 
combineByKey(key & window). Merging windows would have to be handled the 
way it is, or maybe it would be better to split this to


 1) assign windows to elements

 2) combineByKeyAndWindow

Jan

On 6/13/19 3:51 PM, Robert Bradshaw wrote:
I think the problem is that it never leverages the (traditionally much 
cheaper) CombineFn.addInput(old_accumulator, new_value). Instead, it 
always calls CombineFn.mergeAccumulators(old_accumulator, 
CombineFn.addInput(CombineFn.createAccumulator(), new_value)). It 
should be feasible to fix this while still handling windowing 
correctly. (The end-of-window timestamp combiner could also be 
optimized because the timestamp need not be tracked throughout in that 
case.)


On the other hand, once we move everything to portability, it's we'll 
probably toss all this code that use Spark's combiner lifting (instead 
using the GroupingCombiningTable that's implemented naively in Beam, 
as we do for Python to avoid fusion breaks).


On Thu, Jun 13, 2019 at 3:20 PM Jan Lukavský > wrote:


Hi,

I have hit a performance issue with Spark runner, that seems to
related
to its current Combine.perKey implementation. I'll try to
summarize what
I have found in the code:

  - Combine.perKey uses Spark's combineByKey primitive, which is
pretty
similar to the definition of CombineFn

  - it holds all elements as WindowedValues, and uses
Iterable> as accumulator (each WindowedValue holds
accumulated state for each window)

  - the update function is implemented as

   1) convert value to Iterable>

   2) merge accumulators for each windows

The logic inside createAccumulator and mergeAccumulators is quite
non-trivial. The result of profiling is that two frames where the
code
spends most of the time are:

  41633930798   33.18% 4163
org.apache.beam.runners.spark.translation.SparkKeyedCombineFn.mergeCombiners
  19990682441   15.93% 1999

org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.unmodifiableIterable

A simple change on code from

  PCollection<..> input = ...

  input.apply(Combine.perKey(...))

to

  PCollection<..> input = ...

  input

    .apply(GroupByKey.create())

    .apply(Combine.groupedValues(...))

had drastical impact on the job run time (minutes as opposed to
hours,
after which the first job didn't even finish!).

I think I understand the reason why the current logic is
implemented as
it is, it has to deal with merging windows. But the consequences
seem to
be that it renders the implementation very inefficient.

Has anyone seen similar behavior? Does my analysis of the problem
seem
correct?

Jan




Re: SparkRunner Combine.perKey performance

2019-06-13 Thread Robert Bradshaw
I think the problem is that it never leverages the (traditionally much
cheaper) CombineFn.addInput(old_accumulator, new_value). Instead, it always
calls CombineFn.mergeAccumulators(old_accumulator,
CombineFn.addInput(CombineFn.createAccumulator(), new_value)). It should be
feasible to fix this while still handling windowing correctly. (The
end-of-window timestamp combiner could also be optimized because the
timestamp need not be tracked throughout in that case.)

On the other hand, once we move everything to portability, it's we'll
probably toss all this code that use Spark's combiner lifting (instead
using the GroupingCombiningTable that's implemented naively in Beam, as we
do for Python to avoid fusion breaks).

On Thu, Jun 13, 2019 at 3:20 PM Jan Lukavský  wrote:

> Hi,
>
> I have hit a performance issue with Spark runner, that seems to related
> to its current Combine.perKey implementation. I'll try to summarize what
> I have found in the code:
>
>   - Combine.perKey uses Spark's combineByKey primitive, which is pretty
> similar to the definition of CombineFn
>
>   - it holds all elements as WindowedValues, and uses
> Iterable> as accumulator (each WindowedValue holds
> accumulated state for each window)
>
>   - the update function is implemented as
>
>1) convert value to Iterable>
>
>2) merge accumulators for each windows
>
> The logic inside createAccumulator and mergeAccumulators is quite
> non-trivial. The result of profiling is that two frames where the code
> spends most of the time are:
>
>   41633930798   33.18% 4163
>
> org.apache.beam.runners.spark.translation.SparkKeyedCombineFn.mergeCombiners
>   19990682441   15.93% 1999
>
> org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.unmodifiableIterable
>
> A simple change on code from
>
>   PCollection<..> input = ...
>
>   input.apply(Combine.perKey(...))
>
> to
>
>   PCollection<..> input = ...
>
>   input
>
> .apply(GroupByKey.create())
>
> .apply(Combine.groupedValues(...))
>
> had drastical impact on the job run time (minutes as opposed to hours,
> after which the first job didn't even finish!).
>
> I think I understand the reason why the current logic is implemented as
> it is, it has to deal with merging windows. But the consequences seem to
> be that it renders the implementation very inefficient.
>
> Has anyone seen similar behavior? Does my analysis of the problem seem
> correct?
>
> Jan
>
>
>


Re: [DISCUSS] Portability representation of schemas

2019-06-13 Thread Robert Bradshaw
On Thu, Jun 13, 2019 at 5:47 AM Reuven Lax  wrote:

>
> On Wed, Jun 12, 2019 at 8:29 PM Kenneth Knowles  wrote:
>
>> Can we choose a first step? I feel there's consensus around:
>>
>>  - the basic idea of what a schema looks like, ignoring logical types or
>> SDK-specific bits
>>  - the version of logical type which is a standardized URN+payload plus a
>> representation
>>
>> Perhaps we could commit this and see what it looks like to try to use it?
>>
>
+1


> It also seems like there might be consensus around the idea of each of:
>>
>>  - a coder that simply encodes rows; its payload is just a schema; it is
>> minimalist, canonical
>>
>  - a coder that encodes a non-row using the serialization format of a row;
>> this has to be a coder (versus Convert transforms) so that to/from row
>> conversions can be elided when primitives are fused (just like to/from
>> bytes is elided)
>>
>
So, to make it concrete, in the Beam protos we would have an
[Elementwise]SchemaCoder whose single parameterization would be FieldType,
whose definition is in terms of URN + payload + components (+
representation, for non-primitive types, some details TBD there). It could
be deserialized into various different Coder instances (an SDK
implementation detail) in an SDK depending on the type. One of the most
important primitive field types is Row (aka Struct).

We would define a byte encoding for each primitive type. We *could* choose
to simply require that the encoding of any non-row primitive is the same as
its encoding in a single-member row, but that's not necessary.

In the short term, the window/timestamp/pane info would still live outside
via an enclosing WindowCoder, as it does now, not blocking on a desirable
but still-to-be-figured-out unification at that level.

This seems like a good path forward.

Actually this doesn't make sense to me. I think from the portability
> perspective, all we have is schemas - the rest is just a convenience for
> the SDK. As such, I don't think it makes sense at all to model this as a
> Coder.
>

Coder and Schemas are mutually exclusive on PCollections, and completely
specify type information, so I think it makes sense to reuse this (as we're
currently doing) until we can get rid of coders altogether.

(At execution time, we would generalize the notion of a coder to indicate
how *batches* of elements are encoded, not just how individual elements are
encoded. Here we have the option of letting the runner pick depending on
the use (e.g. elementwise for key lookups vs. arrow for bulk data channel
transfer vs ???, possibly with parameters like "preferred batch size") or
standardizing on one physical byte representation for all communication
over the boundary.)


>
>
>>
>> Can we also just have both of these, with different URNs?
>>
>> Kenn
>>
>> On Wed, Jun 12, 2019 at 3:57 PM Reuven Lax  wrote:
>>
>>>
>>>
>>> On Wed, Jun 12, 2019 at 3:46 PM Robert Bradshaw 
>>> wrote:
>>>
 On Tue, Jun 11, 2019 at 8:04 PM Kenneth Knowles 
 wrote:

>
> I believe the schema registry is a transient construction-time
> concept. I don't think there's any need for a concept of a registry in the
> portable representation.
>
> I'd rather urn:beam:schema:logicaltype:javasdk not be used whenever
>> one has (say) a Java POJO as that would prevent other SDKs from
>> "understanding" it as above (unless we had a way of declaring it as "just
>> an alias/wrapper").
>>
>
> I didn't understand the example I snipped, but I think I understand
> your concern here. Is this what you want? (a) something presented as a 
> POJO
> in Java (b) encoded to a row, but still decoded to the POJO and (c)
> non-Java SDK knows that it is "just a struct" so it is safe to mess about
> with or even create new ones. If this is what you want it seems 
> potentially
> useful, but also easy to live without. This can also be done entirely
> within the Java SDK via conversions, leaving no logical type in the
> portable pipeline.
>

 I'm imaging a world where someone defines a PTransform that takes a
 POJO for a constructor, and consumes and produces a POJO, and is now usable
 from Go with no additional work on the PTransform author's part.  But maybe
 I'm thinking about this wrong and the POJO <-> Row conversion is part of
 the @ProcesssElement magic, not encoded in the schema itself.

>>>
>>> The user's output would have to be explicitly schema. They would somehow
>>> have to tell Beam the infer a schema from the output POJO (e.g. one way to
>>> do this is to annotate the POJO with the @DefaultSchema annotation).  We
>>> don't currently magically turn a POJO into a schema unless we are asked to
>>> do so.
>>>
>>


SparkRunner Combine.perKey performance

2019-06-13 Thread Jan Lukavský

Hi,

I have hit a performance issue with Spark runner, that seems to related 
to its current Combine.perKey implementation. I'll try to summarize what 
I have found in the code:


 - Combine.perKey uses Spark's combineByKey primitive, which is pretty 
similar to the definition of CombineFn


 - it holds all elements as WindowedValues, and uses 
Iterable> as accumulator (each WindowedValue holds 
accumulated state for each window)


 - the update function is implemented as

  1) convert value to Iterable>

  2) merge accumulators for each windows

The logic inside createAccumulator and mergeAccumulators is quite 
non-trivial. The result of profiling is that two frames where the code 
spends most of the time are:


 41633930798   33.18% 4163 
org.apache.beam.runners.spark.translation.SparkKeyedCombineFn.mergeCombiners
 19990682441   15.93% 1999 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.unmodifiableIterable


A simple change on code from

 PCollection<..> input = ...

 input.apply(Combine.perKey(...))

to

 PCollection<..> input = ...

 input

   .apply(GroupByKey.create())

   .apply(Combine.groupedValues(...))

had drastical impact on the job run time (minutes as opposed to hours, 
after which the first job didn't even finish!).


I think I understand the reason why the current logic is implemented as 
it is, it has to deal with merging windows. But the consequences seem to 
be that it renders the implementation very inefficient.


Has anyone seen similar behavior? Does my analysis of the problem seem 
correct?


Jan




Re: Streaming pipelines in all SDKs!

2019-06-13 Thread Łukasz Gajowy
Created a PR: https://github.com/apache/beam/pull/8846

śr., 12 cze 2019 o 11:40 Ismaël Mejía  napisał(a):

> Can you please add this to the design documents webpage.
> https://beam.apache.org/contribute/design-documents/
>
> On Fri, May 10, 2019 at 11:50 AM Maximilian Michels 
> wrote:
> >
> > > So, FlinkRunner has some sort of special support for executing
> UnboundedSource via the runner in the portable world ? I see a transform
> override for bounded sources in PortableRunner [1] but nothing for
> unbounded sources.
> >
> > It's in the translation code:
> >
> https://github.com/apache/beam/blob/6679b00138a5b82a6a55e7bc94c453957cea501c/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L216
> >
> > For migration I think that's a valid path, especially because Runners
> > already have the translation code in place. We can later swap-out the
> > UnboundedSource translation with the SDF wrapper.
> >
> > -Max
> >
> > On 09.05.19 22:46, Robert Bradshaw wrote:
> > > From: Chamikara Jayalath 
> > > Date: Thu, May 9, 2019 at 7:49 PM
> > > To: dev
> > >
> > >> From: Maximilian Michels 
> > >> Date: Thu, May 9, 2019 at 9:21 AM
> > >> To: 
> > >>
> > >>> Thanks for sharing your ideas for load testing!
> > >>>
> >  According to other contributors knowledge/experience: I noticed
> that streaming with KafkaIO is currently supported by wrapping the
> ExternalTransform in Python SDK. Do you think that streaming pipelines will
> "just work" with the current state of portability if I do the same for
> UnboundedSyntheticSource or is there something else missing?
> > >>>
> > >>> Basically yes, but it requires a bit more effort than just wrapping
> > >>> about ExternalTransform. You need to provide an
> ExternalTransformBuilder
> > >>> for the transform to be configured externally.
> > >>>
> > >>> In portability UnboundedSources can only be supported via SDF. To
> still
> > >>> be able to use legacy IO which uses UnboundedSource the Runner has to
> > >>> supply this capability (which the Flink Runner supports). This will
> > >>> likely go away if we have an UnboundedSource SDF Wrapper :)
> > >>
> > >>
> > >> So, FlinkRunner has some sort of special support for executing
> UnboundedSource via the runner in the portable world ? I see a transform
> override for bounded sources in PortableRunner [1] but nothing for
> unbounded sources.
> > >>
> > >> Agree, that we cannot properly support cross-language unbounded
> sources till we have SDF and a proper unbounded source to SDF wrapper.
> > >
> > > That is correct. Go will need SDF support as well.
> > >
> > > As waiting on implementing the expansion service, except for the
> > > vending of extra artifacts (which will be an extension), we discussed
> > > this earlier and it's considered stable and ready to build on now.
> > >
>