Re: Why is there no standard boolean coder?

2019-09-29 Thread Chad Dombrova
I’m planning on porting the existing Java coder to Python. Any objections
to that?

-chad


On Sun, Sep 29, 2019 at 1:02 PM Robert Burke  wrote:

> +1
>
> I'm happy to whip together the Go SDK version once the encoding has been
> concretely decided.
>
> On Fri, Sep 27, 2019, 6:07 PM Chad Dombrova  wrote:
>
>>
>> It would still be a standard coder - the distinction I'm proposing is
>>> that there are certain coders that _must_ be implemented by a new
>>> runner/sdk (for example windowedvalue, varint, kv, ...) since they are
>>> important for SDK - runner communication, but now we're starting to
>>> standardize coders that are useful for cross-language and schemas.
>>>
>>
>> Got it.  Sounds good.
>>
>> -chad
>>
>>
>>


Re: Why is there no standard boolean coder?

2019-09-29 Thread Robert Burke
+1

I'm happy to whip together the Go SDK version once the encoding has been
concretely decided.

On Fri, Sep 27, 2019, 6:07 PM Chad Dombrova  wrote:

>
> It would still be a standard coder - the distinction I'm proposing is that
>> there are certain coders that _must_ be implemented by a new runner/sdk
>> (for example windowedvalue, varint, kv, ...) since they are important for
>> SDK - runner communication, but now we're starting to standardize coders
>> that are useful for cross-language and schemas.
>>
>
> Got it.  Sounds good.
>
> -chad
>
>
>


Re: Multiple iterations after GroupByKey with SparkRunner

2019-09-29 Thread Reuven Lax
Jan,

The fact that the annotation on the ParDo "changes" the GroupByKey
implementation is very specific to the Spark runner implementation. You can
imagine another runner that simply writes out files in HDFS to implement a
GroupByKey - this GroupByKey implementation is agnostic whether the result
will be reiterated or not; in this case it is very much the ParDo
implementation that changes to implement a reiterable. vI think you don't
like the fact that an annotation on the ParDo will have a non-local effect
on the implementation of the GroupByKey upstream. However arguably the
non-local effect is just a quirk of how the Spark runner is implemented -
other runners might have a local effect.

In general I sympathize with the worry about non-local effects. Beam is
already full of them (e.g. a Window.into statement effects downstream
GroupByKeys). In each case where they were added there was extensive debate
and discussion (Windowing semantics were debated for many months), exactly
because there was concern over adding these non-local effects. In every
case, no other good solution could be found. For the case of windowing for
example, it was often easy to propose simple local APIs (e.g. just pass the
window fn as a parameter to GroupByKey), however all of these local
solutions ended up not working for important use cases when we analyzed
them more deeply.

As you mentioned below, I do think it's perfectly reasonable for a DSL to
impose its own semantics. Scio already does this - the raw Beam API is used
by a DSL as a substrate, but the DSL does not need to blindly mirror the
semantics of the raw Beam API - at least in my opinion!

Reuven

On Sat, Sep 28, 2019 at 12:26 AM Jan Lukavský  wrote:

> I understand the concerns. Still, it looks a little like we want to be
> able to modify behavior of an object from inside a submodule - quite like
> if my subprogram would accept a Map interface, but internally I would say
> "hey, this is supposed to be a HashMap, please change it so". Because of
> how pipeline is constructed, we can do that, the question is if there
> really isn't a better solution.
>
> What I do not like about the proposed solution:
>
>  1) to specify that the grouped elements are supposed to be iterated only
> once can be done only on ParDo, although there are other (higher level)
> PTransforms, that can consume output of GBK
>
>  2) the annontation on ParDo is by definition generic - i.e. can be used
> on input which is not output of GBK, which makes no sense
>
>  3) we modify the behavior to unlock some optimizations (or change of
> behavior of the GBK itself), users will not understand that
>
>  4) the annotation somewhat arbitrarily modifies data types passed, that
> is counter-intuitive and will be source of confusion
>
> I think that a solution that solves the above problems (but brings somoe
> new, as always :-)), could be to change the output of GBK from
> PCollection> to GroupedPCollection. That way we can
> control which operators (and how) consume the grouping, and we can enable
> these transforms to specify additional parameters (like how they want to
> consume the grouping). It is obviously a breaking change (although can be
> probably made backwards compatible) and it would very much likely involve a
> substantial work. But maybe there are some other not yet discussed options.
>
> Jan
> On 9/28/19 6:46 AM, Reuven Lax wrote:
>
> In many cases, the writer of the ParDo has no access to the GBK (e.g. the
> GBK is hidden inside an upstream PTransform that they cannot modify). This
> is the same reason why RequiresStableInput was made a property of the
> ParDo, because the GroupByKey is quite often inaccessible.
>
> The car analogy doesn't quite apply here, because the runner does have a
> full view of the pipeline so can satisfy all constraints. The car
> dealership generally cannot read your mind (thankfully!), so you have to
> specify what you want. Or to put it another way, the various transforms in
> a Beam pipeline do not live in isolation. The full pipeline graph is what
> is executed, and the runner already has to analyze the full graph to run
> the pipeline (as well as to optimize the pipeline).
>
> Reuven
>
> On Fri, Sep 27, 2019 at 2:35 PM Jan Lukavský  wrote:
>
>> I'd suggest Stream instead of Iterator, it has the same semantics and
>> much better API.
>>
>> Still not sure, what is wrong on letting the GBK to decide this. I have
>> an analogy - if I decide to buy a car, I have to decide *what* car I'm
>> going to buy (by think about how I'm going to use it) *before* I buy it. I
>> cannot just buy "a car" and then change it from minivan to sport car based
>> on my actual need. Same with the GBK - if I want to be able to reiterate
>> the result, then I should tell it in advance.
>>
>> Jan
>> On 9/27/19 10:50 PM, Kenneth Knowles wrote:
>>
>> Good point about sibling fusion requiring this.
>>
>> The type PTransform, KV>> already does imply that
>> the output iterable can be iterated 

RE: Multiple iterations after GroupByKey with SparkRunner

2019-09-29 Thread Gershi, Noam
Hi,

Thanx for the reply.

So – re-iteration on grouped elements is a runner-dependent. Flink & DataFlow 
allows it, while  Spark isn’t.

Since we investigating  here the runners also, Does anyone have a list which 
runner allow\not-allow re-iteration?

Noam


From: [apache.org] Kenneth Knowles 
Sent: Friday, September 27, 2019 7:26 PM
To: dev
Cc: user
Subject: Re: Multiple iterations after GroupByKey with SparkRunner

I am pretty surprised that we do not have a @Category(ValidatesRunner) test in 
GroupByKeyTest that iterates multiple times. That is a major oversight. We 
should have this test, and it can be disabled by the SparkRunner's 
configuration.

Kenn

On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax 
mailto:re...@google.com>> wrote:
The Dataflow version does not spill to disk. However Spark's design might 
require spilling to disk if you want that to be implemented properly.

On Fri, Sep 27, 2019 at 9:08 AM David Morávek 
mailto:d...@apache.org>> wrote:
Hi,

Spark's GBK is currently implemented using `sortBy(key and 
value).mapPartition(...)` for non-merging windowing in order to support large 
keys and large scale shuffles. Merging windowing is implemented using standard 
GBK (underlying spark impl. uses ListCombiner + Hash Grouping), which is by 
design unable to support large keys.

As Jan noted, problem with mapPartition is, that its UDF receives an Iterator. 
Only option here is to wrap this iterator to one that spills to disk once an 
internal buffer is exceeded (the approach suggested by Reuven). This 
unfortunately comes with a cost in some cases. The best approach would be to 
somehow determine, that user wants multiple iterations and than wrap it in 
"re-iterator" if necessary. Does anyone have any ideas how to approach this?

D.

On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax 
mailto:re...@google.com>> wrote:
The Beam API was written to support multiple iterations, and there are 
definitely transforms that do so. I believe that CoGroupByKey may do this as 
well with the resulting iterator.

I know that the Dataflow runner is able to handles iterators larger than 
available memory by paging them in from shuffle, which still allows for 
reiterating. It sounds like Spark is less flexible here?

Reuven

On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský 
mailto:je...@seznam.cz>> wrote:

+dev 

Lukasz, why do you think that users expect to be able to iterate multiple times 
grouped elements? Besides that it obviously suggests the 'Iterable'? The way 
that spark behaves is pretty much analogous to how MapReduce used to work - in 
certain cases it calles repartitionAndSortWithinPartitions and then does 
mapPartition, which accepts Iterator - that is because internally it merge 
sorts pre sorted segments. This approach enables to GroupByKey data sets that 
are too big to fit into memory (per key).

If multiple iterations should be expected by users, we probably should:

 a) include that in @ValidatesRunner tests

 b) store values in memory on spark, which will break for certain pipelines

Because of (b) I think that it would be much better to remove this 
"expectation" and clearly document that the Iterable is not supposed to be 
iterated multiple times.

Jan
On 9/27/19 9:27 AM, Jan Lukavský wrote:

I pretty much think so, because that is how Spark works. The Iterable inside is 
really an Iterator, which cannot be iterated multiple times.

Jan
On 9/27/19 2:00 AM, Lukasz Cwik wrote:
Jan, in Beam users expect to be able to iterate the GBK output multiple times 
even from within the same ParDo.
Is this something that Beam on Spark Runner never supported?

On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský 
mailto:je...@seznam.cz>> wrote:

Hi Gershi,

could you please outline the pipeline you are trying to execute? Basically, you 
cannot iterate the Iterable multiple times in single ParDo. It should be 
possible, though, to apply multiple ParDos to output from GroupByKey.

Jan
On 9/26/19 3:32 PM, Gershi, Noam wrote:
Hi,

I want to iterate multiple times on the Iterable (the output of GroupByKey 
transformation)
When my Runner is SparkRunner, I get an exception:

Caused by: java.lang.IllegalStateException: ValueIterator can't be iterated 
more than once,otherwise there could be data lost
at 
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
at