Re: Multiple iterations after GroupByKey with SparkRunner

2019-10-04 Thread Kenneth Knowles
The DoFnSignature is where the information "this ParDo only needs a
oneshot" would be recorded. This is what enables a runner to use the
GBKOneShot in place of a full GBK.

Kenn

On Fri, Oct 4, 2019 at 1:13 AM Reuven Lax  wrote:

> Yes - this approach puts compatibility checking on the user. However we
> could provide another way for the ParDo to "advertise" the set of states it
> will access. This is similar to what Kenn proposed: today there is a
> DoFnSignature object that is inferred
> reflectively based on annotations. However if there were an API to modify
> the DoFnSignature, then a DSL can simply use that API to list a set of
> state readers.
>
> Reuven
>
>
>
> On Fri, Oct 4, 2019 at 1:03 AM Jan Lukavský  wrote:
>
>> +1
>>
>> But I'd warn a little against this kind of absolute freedom for the
>> process() method. It should probably remain that all states will be created
>> before any element passes in, because otherwise it would be hard (if not
>> impossible) to do any compatibility checking of state upon pipeline
>> upgrades.
>>
>> Jan
>> On 10/4/19 9:47 AM, Reuven Lax wrote:
>>
>> IMO the fact that Stateful ParDo requires compile-time annotations isn't
>> the biggest problem - it's that it requires a static set of them, one for
>> each state. This is fine for specific user code, but we really should add
>> the ability to pass in a StateAccessor object to a DoFn that allows the
>> DoFn to dynamically create different state objects. Something like the
>> following:
>>
>> public void process(StateAccessor stateAccessor, ...) {
>>stateAccessor.getValueState("state1", TypeDescriptors.ints()).get();
>>stateAccessor.getMapState("state2', TypeDescriptors.strings(),
>> TypeDescriptors.ints()).put();
>>etc.
>> }
>>
>> This would be a bit less type safe than the current approach (someone
>> could try and fetch the same state twice with different types). However it
>> would be much friendlier to DSLs, and indeed any "generic" PTransform that
>> does not statically know all of its states at compile time.
>>
>> I think we need similar functionality for timers.
>>
>> Reuven
>>
>> On Fri, Oct 4, 2019 at 12:36 AM Jan Lukavský  wrote:
>>
>>> > So to me the interesting part is that there is a DSL that wants to
>>> support primitives that are strictly weaker than Beam's, in order to *only*
>>> allow the oneshot path. Annotations are quite annoying for DSLs, as you may
>>> have noticed for state & timers, so that is not a good fit. But the
>>> concepts still work. I would suggest pivot this thread into how to allow a
>>> DSL builder to directly provide a DoFnInvoke with DoFnSignature in order to
>>> programmatically provide the same information that annotations are used.
>>> Essentially exposing an IR to DSL authors rather than forcing them to work
>>> with the source language meant for end users. Do you already have a
>>> solution for this today?
>>>
>>> We have been talking about that this would be useful - it is mostly due
>>> to the fact that stateful ParDo requires annotations (compile time) why
>>> Euphoria lacks stateful processing support. For that, we need exactly what
>>> you say, we need to be able to provide runner directly with DoFnSignature.
>>> Other solutions would be kind of hackish.
>>>
>>> On the other hand, this isn't directly related to the discussion about
>>> reiterations in GBK, is it? I think DoFnSignatures cannot help us here,
>>> because we need to affect the way GBK is translated in runner, not the
>>> ParDo. So it quite naturally leads to the RBK, or "streamed GBK". If we
>>> have a consensus on that, I can create JIRAs and move it forward.
>>>
>>> Jan
>>> On 10/3/19 7:19 PM, Kenneth Knowles wrote:
>>>
>>> On Tue, Oct 1, 2019 at 5:35 PM Robert Bradshaw 
>>> wrote:
>>>
 For this specific usecase, I would suggest this be done via
 PTranform URNs. E.g. one could have a GroupByKeyOneShot whose
 implementation is

 input
 .apply(GroupByKey.of()
 .apply(kv -> KV.of(kv.key(), kv.iterator())

>>>
>>> This is dual to what I clumsily was trying to say in my last paragraph.
>>> But I agree that ReduceByKey is better, if we were to add any new primitive
>>> transform. I very much dislike PCollection for just the reasons
>>> you also mention.
>>>
>>> I think the annotation route where @ProcessElement can accept a
>>> different type of element seems less intrusive and more flexible.
>>>
>>>
 On Tue, Oct 1, 2019 at 2:16 AM Jan Lukavský  wrote:

> The car analogy was meant to say, that in real world you have to make
> decision before you take any action. There is no retroactivity possible.
>
 Reuven pointed out, that it is possible (although it seems a little
> weird to me, but that is the only thing I can tell against it :-)), that
> the way a grouped PCollection is produced might be out of control of a
> consuming operator. One example of this might be, that the grouping is
> produced in a submodule (some 

Re: Multiple iterations after GroupByKey with SparkRunner

2019-10-03 Thread Kenneth Knowles
On Tue, Oct 1, 2019 at 5:35 PM Robert Bradshaw  wrote:

> For this specific usecase, I would suggest this be done via
> PTranform URNs. E.g. one could have a GroupByKeyOneShot whose
> implementation is
>
> input
> .apply(GroupByKey.of()
> .apply(kv -> KV.of(kv.key(), kv.iterator())
>

This is dual to what I clumsily was trying to say in my last paragraph. But
I agree that ReduceByKey is better, if we were to add any new primitive
transform. I very much dislike PCollection for just the reasons
you also mention.

I think the annotation route where @ProcessElement can accept a different
type of element seems less intrusive and more flexible.


> On Tue, Oct 1, 2019 at 2:16 AM Jan Lukavský  wrote:
>
>> The car analogy was meant to say, that in real world you have to make
>> decision before you take any action. There is no retroactivity possible.
>>
> Reuven pointed out, that it is possible (although it seems a little weird
>> to me, but that is the only thing I can tell against it :-)), that the way
>> a grouped PCollection is produced might be out of control of a consuming
>> operator. One example of this might be, that the grouping is produced in a
>> submodule (some library), but still, the consumer wants to be able to
>> specify if he wants or doesn't want reiterations.
>>
> Exactly. The person choosing to GroupByKey and the person writing the
one-shot ParDo must be assumed to be different people, in general.

FWIW I always think of the pipeline as a program and the runner as a
planner/optimizer. So always responsible for reordering and making physical
planning decisions like whether to create an iterable materialization or
just some streamed iterator.

If we move on, our next option might be to specify the annotation on the
>> consumer (as suggested), but that has all the "not really nice" properties
>> of being counter-intuitive, ignoring strong types, etc., etc., for which
>> reason I think that this should be ruled out as well.
>>
> In Beam we have taken the position that type checking the graph after it
is constructed is an early enough place to catch type errors (speaking for
Java). The validation that ParDo does on the DoFn is basically lightweight,
local, type checking. This is how we detect and type check stateful ParDo
transforms as well as splittable ParDo transforms. We also catch errors
that are not expressible in Java's type system.

If we were discussing just this Spark limitation/optimization this is a
very natural fit with what we already have: Give the runner all the
information about the nature of the transforms and user functions, and let
it make the best plan it can.

So to me the interesting part is that there is a DSL that wants to support
primitives that are strictly weaker than Beam's, in order to *only* allow
the oneshot path. Annotations are quite annoying for DSLs, as you may have
noticed for state & timers, so that is not a good fit. But the concepts
still work. I would suggest pivot this thread into how to allow a DSL
builder to directly provide a DoFnInvoke with DoFnSignature in order to
programmatically provide the same information that annotations are used.
Essentially exposing an IR to DSL authors rather than forcing them to work
with the source language meant for end users. Do you already have a
solution for this today?

Kenn

This leaves us with a single option (at least I have not figured out any
>> other) - which is we can bundle GBK and associated ParDo into atomic
>> PTransform, which can then be overridden by runners that need special
>> handling of this situation - these are all runners that need buffer data to
>> memory in order to support reiterations (spark and flink, note that this
>> problem arises only for batch case, because in streaming case, one can
>> reasonably assume that the data resides in a state that supports
>> reiterations). But - we already have this PTransform in Euphoria, it is
>> called ReduceByKey, and has all the required properties (technically, it is
>> not a PTransform now, but that is a minor detail and can be changed
>> trivially).
>>
>> So, the direction I was trying to take this discussion was - what could
>> be the best way for a runner to natively support a PTransform from a DSL? I
>> can imagine several options:
>>
>>  a) support it directly and let runners depend on the DSL (compileOnly
>> dependency might suffice, because users will include the DSL into their
>> code to be able to use it)
>>
>>  b) create an interface in runners for user-code to be able to provide
>> translation for user-specified operators (this could be absolutely generic,
>> DSLs might just use this feature the same way any user could), after all
>> runners already use a concept of Translator, but that is pretty much
>> copy-pasted, not abstracted into a general purpose one
>>
>>  c) move the operators that need to be translated into core
>>
>> The option (c) then leaves open questions related to - if we would want
>> to move other operators to core, 

Re: Multiple iterations after GroupByKey with SparkRunner

2019-10-03 Thread Reuven Lax
Ok - now I see what you're talking about. You are focusing on the Java
types in the Java SDK, where the output of GBK is an Iterable type (which
should always be reiterable). I was talking more abstractly about the
programming model, i.e. the portability representation of the graph.

In this case I think that Robert's suggestion for the Java SDK is the right
one. Create a new transform, and have runners optimize it away if necessary.

On Wed, Oct 2, 2019 at 2:19 AM Jan Lukavský  wrote:

>
> On 10/2/19 4:30 AM, Reuven Lax wrote:
>
>
>
> On Mon, Sep 30, 2019 at 2:02 AM Jan Lukavský  wrote:
>
>> > The fact that the annotation on the ParDo "changes" the GroupByKey
>> implementation is very specific to the Spark runner implementation.
>>
>> I don't quite agree. It is not very specific to Spark, it is specific to
>> generally all runners, that produce grouped elements in a way that is not
>> reiterable. That is the key property. The example you gave with HDFS does
>> not satisfy this condition (files on HDFS are certainly reiterable), and
>> that's why no change to the GBK is needed (it actually already has the
>> required property). A quick look at what FlinkRunner (at least non portable
>> does) is that it implements GBK using reducing elements into List. That is
>> going to crash on big PCollection, which is even nicely documented:
>>
>>* For internal use to translate {@link GroupByKey}. For a large {@link 
>> PCollection} this is
>>* expected to crash!
>>
>>
>> If this is fixed, then it is likely to start behave the same as Spark. So
>> actually I think the opposite is true - Dataflow is a special case, because
>> of how its internal shuffle service works.
>>
>
> I think you misunderstood - I was not trying to dish on the Spark runner.
> Rather my point is that whether the GroupByKey implementation is affected
> or not is runner dependent. In some runners it is and in others it isn't.
> However in all cases the *semantics* of the ParDo is affected. Since Beam
> tries as much as possible to be runner agnostic, we should default to
> making the change where there is an obvious semantic difference.
>
>
> I understand that, but I just don't think, that a semantics should be
> affected by this. If outcome of GBK is Iterable, then it should be
> reiterable, that is how Iterable works, so I now lean more towards a
> conclusion, that the current behavior of Spark runner simply breaks this
> contract. Solution of this would be to introduce the proposed
> GroupByKeyOneShot or Reduce(By|Per)Key.
>
>
>
> > In general I sympathize with the worry about non-local effects. Beam is
>> already full of them (e.g. a Window.into statement effects downstream
>> GroupByKeys). In each case where they were added there was extensive debate
>> and discussion (Windowing semantics were debated for many months), exactly
>> because there was concern over adding these non-local effects. In every
>> case, no other good solution could be found. For the case of windowing for
>> example, it was often easy to propose simple local APIs (e.g. just pass the
>> window fn as a parameter to GroupByKey), however all of these local
>> solutions ended up not working for important use cases when we analyzed
>> them more deeply.
>>
>> That is very interesting. Could you elaborate more about some examples of
>> the use cases which didn't work? I'd like to try to match it against how
>> Euphoria is structured, it should be more resistant to this non-local
>> effects, because it very often bundles together multiple Beam's primitives
>> to single transform - ReduceByKey is one example of this, if is actually
>> mix of Window.into() + GBK + ParDo, Although it might look like if this
>> transform can be broken down to something else, then it is not primitive
>> (euphoria has no native equivalent of GBK itself), but it has several other
>> nice implications - that is that Combine now becomes a special case of RBK.
>> It now becomes only a question of where and how you can "run" the reduce
>> function. The logic is absolutely equal. This can be worked in more detail
>> and actually show, that even Combine and RBK can be decribed by a more
>> general stateful operation (ReduceStateByKey), and so finally Euphoria
>> actually has only two really "primitive" operations - these are FlatMap
>> (basically stateless ParDo) and RSBK. As I already mentioned on some other
>> thread, when stateful ParDo would support merging windows, it can be shown
>> that both Combine and GBK become special cases of this.
>>
>> > As you mentioned below, I do think it's perfectly reasonable for a DSL
>> to impose its own semantics. Scio already does this - the raw Beam API is
>> used by a DSL as a substrate, but the DSL does not need to blindly mirror
>> the semantics of the raw Beam API - at least in my opinion!
>>
>> Sure, but currently, there is no way for DSL to "hook" into runner, so it
>> has to use raw Beam SDK, and so this will fail in cases like this - where
>> Beam 

Re: Multiple iterations after GroupByKey with SparkRunner

2019-10-03 Thread Reuven Lax
Putting a stateful dofn after a GBK is not completely redundant - the
element type changes, so it is different than just having .a stateful dofn.
However it is a weird thing to do, and usually not optimal (especially
because many runners might insert two shuffles in this case).

On Wed, Oct 2, 2019 at 2:24 AM Jan Lukavský  wrote:

> +1
>
> The difference between GroupByKeyOneShot and Reduce(By|Per)Key is probably
> only in that in the first case one can pass the result to a stateful ParDo.
> The latter has a more strict semantics, so that user is a little limited
> about what he can do with the result of the grouping. It seems to me,
> though, that applying a stateful operation on result of grouping makes
> little sense, because stateful operation performs this grouping (keying)
> automatically, so the preceding GroupBeyKeyOneShot would be somewhat
> redundant. But maybe someone can provide a different insight.
> On 10/2/19 2:34 AM, Robert Bradshaw wrote:
>
> For this specific usecase, I would suggest this be done via
> PTranform URNs. E.g. one could have a GroupByKeyOneShot whose
> implementation is
>
> input
> .apply(GroupByKey.of()
> .apply(kv -> KV.of(kv.key(), kv.iterator())
>
> A runner would be free to recognize and optimize this in the graph (based
> on its urn) and swap out a more efficient implementation. Of course a
> Coder would have to be introduced, and the semantics of
> PCollection are a bit odd due to the inherently mutable nature of
> Iterators. (Possibly a ReducePerKey transform would be a better
> abstraction.)
>
>
> On Tue, Oct 1, 2019 at 2:16 AM Jan Lukavský  wrote:
>
>> The car analogy was meant to say, that in real world you have to make
>> decision before you take any action. There is no retroactivity possible.
>>
>> Reuven pointed out, that it is possible (although it seems a little weird
>> to me, but that is the only thing I can tell against it :-)), that the way
>> a grouped PCollection is produced might be out of control of a consuming
>> operator. One example of this might be, that the grouping is produced in a
>> submodule (some library), but still, the consumer wants to be able to
>> specify if he wants or doesn't want reiterations. There still is a
>> "classical" solution to this - the library might expose an interface to
>> specify a factory for the grouped PCollection, so that the user of the
>> library will be able to specify what he wants. But we can say, that we
>> don't want to force users (or authors of libraries) to do that. That's okay
>> for me.
>>
>> If we move on, our next option might be to specify the annotation on the
>> consumer (as suggested), but that has all the "not really nice" properties
>> of being counter-intuitive, ignoring strong types, etc., etc., for which
>> reason I think that this should be ruled out as well.
>>
>> This leaves us with a single option (at least I have not figured out any
>> other) - which is we can bundle GBK and associated ParDo into atomic
>> PTransform, which can then be overridden by runners that need special
>> handling of this situation - these are all runners that need buffer data to
>> memory in order to support reiterations (spark and flink, note that this
>> problem arises only for batch case, because in streaming case, one can
>> reasonably assume that the data resides in a state that supports
>> reiterations). But - we already have this PTransform in Euphoria, it is
>> called ReduceByKey, and has all the required properties (technically, it is
>> not a PTransform now, but that is a minor detail and can be changed
>> trivially).
>>
>> So, the direction I was trying to take this discussion was - what could
>> be the best way for a runner to natively support a PTransform from a DSL? I
>> can imagine several options:
>>
>>  a) support it directly and let runners depend on the DSL (compileOnly
>> dependency might suffice, because users will include the DSL into their
>> code to be able to use it)
>>
>>  b) create an interface in runners for user-code to be able to provide
>> translation for user-specified operators (this could be absolutely generic,
>> DSLs might just use this feature the same way any user could), after all
>> runners already use a concept of Translator, but that is pretty much
>> copy-pasted, not abstracted into a general purpose one
>>
>>  c) move the operators that need to be translated into core
>>
>> The option (c) then leaves open questions related to - if we would want
>> to move other operators to core, would this be the right time to ask
>> questions if our current set of "core" operators is the ideal one? Or could
>> this be optimized?
>>
>> Jan
>> On 10/1/19 12:32 AM, Kenneth Knowles wrote:
>>
>> In the car analogy, you have something this:
>>
>> Iterable: car
>> Iterator: taxi ride
>>
>> They are related, but not as variations of a common concept.
>>
>> In the discussion of Combine vs RSBK, if the reducer is required to be an
>> associative and commutative operator, then it 

Re: Multiple iterations after GroupByKey with SparkRunner

2019-10-02 Thread Jan Lukavský


On 10/2/19 4:30 AM, Reuven Lax wrote:



On Mon, Sep 30, 2019 at 2:02 AM Jan Lukavský > wrote:


> The fact that the annotation on the ParDo "changes" the
GroupByKey implementation is very specific to the Spark runner
implementation.

I don't quite agree. It is not very specific to Spark, it is
specific to generally all runners, that produce grouped elements
in a way that is not reiterable. That is the key property. The
example you gave with HDFS does not satisfy this condition (files
on HDFS are certainly reiterable), and that's why no change to the
GBK is needed (it actually already has the required property). A
quick look at what FlinkRunner (at least non portable does) is
that it implements GBK using reducing elements into List. That is
going to crash on big PCollection, which is even nicely documented:

    * For internal use to translate {@link GroupByKey}. For a large 
{@link PCollection} this is
    * expected to crash!

If this is fixed, then it is likely to start behave the same as
Spark. So actually I think the opposite is true - Dataflow is a
special case, because of how its internal shuffle service works.


I think you misunderstood - I was not trying to dish on the Spark 
runner. Rather my point is that whether the GroupByKey implementation 
is affected or not is runner dependent. In some runners it is and in 
others it isn't. However in all cases the /semantics/ of the ParDo is 
affected. Since Beam tries as much as possible to be runner agnostic, 
we should default to making the change where there is an obvious 
semantic difference.



I understand that, but I just don't think, that a semantics should be 
affected by this. If outcome of GBK is Iterable, then it should be 
reiterable, that is how Iterable works, so I now lean more towards a 
conclusion, that the current behavior of Spark runner simply breaks this 
contract. Solution of this would be to introduce the proposed 
GroupByKeyOneShot or Reduce(By|Per)Key.





> In general I sympathize with the worry about non-local effects.
Beam is already full of them (e.g. a Window.into statement effects
downstream GroupByKeys). In each case where they were added there
was extensive debate and discussion (Windowing semantics were
debated for many months), exactly because there was concern over
adding these non-local effects. In every case, no other good
solution could be found. For the case of windowing for example, it
was often easy to propose simple local APIs (e.g. just pass the
window fn as a parameter to GroupByKey), however all of these
local solutions ended up not working for important use cases when
we analyzed them more deeply.

That is very interesting. Could you elaborate more about some
examples of the use cases which didn't work? I'd like to try to
match it against how Euphoria is structured, it should be more
resistant to this non-local effects, because it very often bundles
together multiple Beam's primitives to single transform -
ReduceByKey is one example of this, if is actually mix of
Window.into() + GBK + ParDo, Although it might look like if this
transform can be broken down to something else, then it is not
primitive (euphoria has no native equivalent of GBK itself), but
it has several other nice implications - that is that Combine now
becomes a special case of RBK. It now becomes only a question of
where and how you can "run" the reduce function. The logic is
absolutely equal. This can be worked in more detail and actually
show, that even Combine and RBK can be decribed by a more general
stateful operation (ReduceStateByKey), and so finally Euphoria
actually has only two really "primitive" operations - these are
FlatMap (basically stateless ParDo) and RSBK. As I already
mentioned on some other thread, when stateful ParDo would support
merging windows, it can be shown that both Combine and GBK become
special cases of this.

> As you mentioned below, I do think it's perfectly reasonable for a
DSL to impose its own semantics. Scio already does this - the raw
Beam API is used by a DSL as a substrate, but the DSL does not
need to blindly mirror the semantics of the raw Beam API - at
least in my opinion!

Sure, but currently, there is no way for DSL to "hook" into
runner, so it has to use raw Beam SDK, and so this will fail in
cases like this - where Beam actually has stronger guarantees than
it is required by the DSL. It would be cool if we could find a way
to do that - this pretty much aligns with another question raised
on ML, about the possibility to override a default implementation
of a PTransform for specific pipeline.

Jan


On 9/29/19 7:46 PM, Reuven Lax wrote:

Jan,

The fact that the annotation on the ParDo "changes" the
GroupByKey 

Re: Multiple iterations after GroupByKey with SparkRunner

2019-10-01 Thread Reuven Lax
On Mon, Sep 30, 2019 at 2:02 AM Jan Lukavský  wrote:

> > The fact that the annotation on the ParDo "changes" the GroupByKey
> implementation is very specific to the Spark runner implementation.
>
> I don't quite agree. It is not very specific to Spark, it is specific to
> generally all runners, that produce grouped elements in a way that is not
> reiterable. That is the key property. The example you gave with HDFS does
> not satisfy this condition (files on HDFS are certainly reiterable), and
> that's why no change to the GBK is needed (it actually already has the
> required property). A quick look at what FlinkRunner (at least non portable
> does) is that it implements GBK using reducing elements into List. That is
> going to crash on big PCollection, which is even nicely documented:
>
>* For internal use to translate {@link GroupByKey}. For a large {@link 
> PCollection} this is
>* expected to crash!
>
>
> If this is fixed, then it is likely to start behave the same as Spark. So
> actually I think the opposite is true - Dataflow is a special case, because
> of how its internal shuffle service works.
>

I think you misunderstood - I was not trying to dish on the Spark runner.
Rather my point is that whether the GroupByKey implementation is affected
or not is runner dependent. In some runners it is and in others it isn't.
However in all cases the *semantics* of the ParDo is affected. Since Beam
tries as much as possible to be runner agnostic, we should default to
making the change where there is an obvious semantic difference.

> In general I sympathize with the worry about non-local effects. Beam is
> already full of them (e.g. a Window.into statement effects downstream
> GroupByKeys). In each case where they were added there was extensive debate
> and discussion (Windowing semantics were debated for many months), exactly
> because there was concern over adding these non-local effects. In every
> case, no other good solution could be found. For the case of windowing for
> example, it was often easy to propose simple local APIs (e.g. just pass the
> window fn as a parameter to GroupByKey), however all of these local
> solutions ended up not working for important use cases when we analyzed
> them more deeply.
>
> That is very interesting. Could you elaborate more about some examples of
> the use cases which didn't work? I'd like to try to match it against how
> Euphoria is structured, it should be more resistant to this non-local
> effects, because it very often bundles together multiple Beam's primitives
> to single transform - ReduceByKey is one example of this, if is actually
> mix of Window.into() + GBK + ParDo, Although it might look like if this
> transform can be broken down to something else, then it is not primitive
> (euphoria has no native equivalent of GBK itself), but it has several other
> nice implications - that is that Combine now becomes a special case of RBK.
> It now becomes only a question of where and how you can "run" the reduce
> function. The logic is absolutely equal. This can be worked in more detail
> and actually show, that even Combine and RBK can be decribed by a more
> general stateful operation (ReduceStateByKey), and so finally Euphoria
> actually has only two really "primitive" operations - these are FlatMap
> (basically stateless ParDo) and RSBK. As I already mentioned on some other
> thread, when stateful ParDo would support merging windows, it can be shown
> that both Combine and GBK become special cases of this.
>
> > As you mentioned below, I do think it's perfectly reasonable for a DSL
> to impose its own semantics. Scio already does this - the raw Beam API is
> used by a DSL as a substrate, but the DSL does not need to blindly mirror
> the semantics of the raw Beam API - at least in my opinion!
>
> Sure, but currently, there is no way for DSL to "hook" into runner, so it
> has to use raw Beam SDK, and so this will fail in cases like this - where
> Beam actually has stronger guarantees than it is required by the DSL. It
> would be cool if we could find a way to do that - this pretty much aligns
> with another question raised on ML, about the possibility to override a
> default implementation of a PTransform for specific pipeline.
>
> Jan
>
>
> On 9/29/19 7:46 PM, Reuven Lax wrote:
>
> Jan,
>
> The fact that the annotation on the ParDo "changes" the GroupByKey
> implementation is very specific to the Spark runner implementation. You can
> imagine another runner that simply writes out files in HDFS to implement a
> GroupByKey - this GroupByKey implementation is agnostic whether the result
> will be reiterated or not; in this case it is very much the ParDo
> implementation that changes to implement a reiterable. vI think you don't
> like the fact that an annotation on the ParDo will have a non-local effect
> on the implementation of the GroupByKey upstream. However arguably the
> non-local effect is just a quirk of how the Spark runner is implemented -
> other 

Re: Multiple iterations after GroupByKey with SparkRunner

2019-10-01 Thread Robert Bradshaw
For this specific usecase, I would suggest this be done via PTranform URNs.
E.g. one could have a GroupByKeyOneShot whose implementation is

input
.apply(GroupByKey.of()
.apply(kv -> KV.of(kv.key(), kv.iterator())

A runner would be free to recognize and optimize this in the graph (based
on its urn) and swap out a more efficient implementation. Of course a
Coder would have to be introduced, and the semantics of
PCollection are a bit odd due to the inherently mutable nature of
Iterators. (Possibly a ReducePerKey transform would be a better
abstraction.)


On Tue, Oct 1, 2019 at 2:16 AM Jan Lukavský  wrote:

> The car analogy was meant to say, that in real world you have to make
> decision before you take any action. There is no retroactivity possible.
>
> Reuven pointed out, that it is possible (although it seems a little weird
> to me, but that is the only thing I can tell against it :-)), that the way
> a grouped PCollection is produced might be out of control of a consuming
> operator. One example of this might be, that the grouping is produced in a
> submodule (some library), but still, the consumer wants to be able to
> specify if he wants or doesn't want reiterations. There still is a
> "classical" solution to this - the library might expose an interface to
> specify a factory for the grouped PCollection, so that the user of the
> library will be able to specify what he wants. But we can say, that we
> don't want to force users (or authors of libraries) to do that. That's okay
> for me.
>
> If we move on, our next option might be to specify the annotation on the
> consumer (as suggested), but that has all the "not really nice" properties
> of being counter-intuitive, ignoring strong types, etc., etc., for which
> reason I think that this should be ruled out as well.
>
> This leaves us with a single option (at least I have not figured out any
> other) - which is we can bundle GBK and associated ParDo into atomic
> PTransform, which can then be overridden by runners that need special
> handling of this situation - these are all runners that need buffer data to
> memory in order to support reiterations (spark and flink, note that this
> problem arises only for batch case, because in streaming case, one can
> reasonably assume that the data resides in a state that supports
> reiterations). But - we already have this PTransform in Euphoria, it is
> called ReduceByKey, and has all the required properties (technically, it is
> not a PTransform now, but that is a minor detail and can be changed
> trivially).
>
> So, the direction I was trying to take this discussion was - what could be
> the best way for a runner to natively support a PTransform from a DSL? I
> can imagine several options:
>
>  a) support it directly and let runners depend on the DSL (compileOnly
> dependency might suffice, because users will include the DSL into their
> code to be able to use it)
>
>  b) create an interface in runners for user-code to be able to provide
> translation for user-specified operators (this could be absolutely generic,
> DSLs might just use this feature the same way any user could), after all
> runners already use a concept of Translator, but that is pretty much
> copy-pasted, not abstracted into a general purpose one
>
>  c) move the operators that need to be translated into core
>
> The option (c) then leaves open questions related to - if we would want to
> move other operators to core, would this be the right time to ask questions
> if our current set of "core" operators is the ideal one? Or could this be
> optimized?
>
> Jan
> On 10/1/19 12:32 AM, Kenneth Knowles wrote:
>
> In the car analogy, you have something this:
>
> Iterable: car
> Iterator: taxi ride
>
> They are related, but not as variations of a common concept.
>
> In the discussion of Combine vs RSBK, if the reducer is required to be an
> associative and commutative operator, then it is the same thing under a
> different name. If the reducer can be non-associative or non-commutative,
> then it admits fewer transformations/optimizations.
>
> If you introduce a GroupIteratorsByKey and implement GroupByKey as a
> transform that combines the iterator by concatenation, I think you do get
> an internally consistent system. To execute efficiently, you need to always
> identify and replace the GroupByKey operation with a primitive one. It does
> make some sense to expose the weakest primitives for the sake of DSLs. But
> they are very poorly suited for end-users, and for GBK on most runners you
> get the more powerful one for free.
>
> Kenn
>
> On Mon, Sep 30, 2019 at 2:02 AM Jan Lukavský  wrote:
>
>> > The fact that the annotation on the ParDo "changes" the GroupByKey
>> implementation is very specific to the Spark runner implementation.
>>
>> I don't quite agree. It is not very specific to Spark, it is specific to
>> generally all runners, that produce grouped elements in a way that is not
>> reiterable. That is the key property. The 

Re: Multiple iterations after GroupByKey with SparkRunner

2019-10-01 Thread Jan Lukavský
The car analogy was meant to say, that in real world you have to make 
decision before you take any action. There is no retroactivity possible.


Reuven pointed out, that it is possible (although it seems a little 
weird to me, but that is the only thing I can tell against it :-)), that 
the way a grouped PCollection is produced might be out of control of a 
consuming operator. One example of this might be, that the grouping is 
produced in a submodule (some library), but still, the consumer wants to 
be able to specify if he wants or doesn't want reiterations. There still 
is a "classical" solution to this - the library might expose an 
interface to specify a factory for the grouped PCollection, so that the 
user of the library will be able to specify what he wants. But we can 
say, that we don't want to force users (or authors of libraries) to do 
that. That's okay for me.


If we move on, our next option might be to specify the annotation on the 
consumer (as suggested), but that has all the "not really nice" 
properties of being counter-intuitive, ignoring strong types, etc., 
etc., for which reason I think that this should be ruled out as well.


This leaves us with a single option (at least I have not figured out any 
other) - which is we can bundle GBK and associated ParDo into atomic 
PTransform, which can then be overridden by runners that need special 
handling of this situation - these are all runners that need buffer data 
to memory in order to support reiterations (spark and flink, note that 
this problem arises only for batch case, because in streaming case, one 
can reasonably assume that the data resides in a state that supports 
reiterations). But - we already have this PTransform in Euphoria, it is 
called ReduceByKey, and has all the required properties (technically, it 
is not a PTransform now, but that is a minor detail and can be changed 
trivially).


So, the direction I was trying to take this discussion was - what could 
be the best way for a runner to natively support a PTransform from a 
DSL? I can imagine several options:


 a) support it directly and let runners depend on the DSL (compileOnly 
dependency might suffice, because users will include the DSL into their 
code to be able to use it)


 b) create an interface in runners for user-code to be able to provide 
translation for user-specified operators (this could be absolutely 
generic, DSLs might just use this feature the same way any user could), 
after all runners already use a concept of Translator, but that is 
pretty much copy-pasted, not abstracted into a general purpose one


 c) move the operators that need to be translated into core

The option (c) then leaves open questions related to - if we would want 
to move other operators to core, would this be the right time to ask 
questions if our current set of "core" operators is the ideal one? Or 
could this be optimized?


Jan

On 10/1/19 12:32 AM, Kenneth Knowles wrote:

In the car analogy, you have something this:

    Iterable: car
    Iterator: taxi ride

They are related, but not as variations of a common concept.

In the discussion of Combine vs RSBK, if the reducer is required to be 
an associative and commutative operator, then it is the same thing 
under a different name. If the reducer can be non-associative or 
non-commutative, then it admits fewer transformations/optimizations.


If you introduce a GroupIteratorsByKey and implement GroupByKey as a 
transform that combines the iterator by concatenation, I think you do 
get an internally consistent system. To execute efficiently, you need 
to always identify and replace the GroupByKey operation with a 
primitive one. It does make some sense to expose the weakest 
primitives for the sake of DSLs. But they are very poorly suited for 
end-users, and for GBK on most runners you get the more powerful one 
for free.


Kenn

On Mon, Sep 30, 2019 at 2:02 AM Jan Lukavský > wrote:


> The fact that the annotation on the ParDo "changes" the
GroupByKey implementation is very specific to the Spark runner
implementation.

I don't quite agree. It is not very specific to Spark, it is
specific to generally all runners, that produce grouped elements
in a way that is not reiterable. That is the key property. The
example you gave with HDFS does not satisfy this condition (files
on HDFS are certainly reiterable), and that's why no change to the
GBK is needed (it actually already has the required property). A
quick look at what FlinkRunner (at least non portable does) is
that it implements GBK using reducing elements into List. That is
going to crash on big PCollection, which is even nicely documented:

    * For internal use to translate {@link GroupByKey}. For a large 
{@link PCollection} this is
    * expected to crash!

If this is fixed, then it is likely to start behave the same as
Spark. So actually I think the opposite is true - 

Re: Multiple iterations after GroupByKey with SparkRunner

2019-09-30 Thread Kenneth Knowles
In the car analogy, you have something this:

Iterable: car
Iterator: taxi ride

They are related, but not as variations of a common concept.

In the discussion of Combine vs RSBK, if the reducer is required to be an
associative and commutative operator, then it is the same thing under a
different name. If the reducer can be non-associative or non-commutative,
then it admits fewer transformations/optimizations.

If you introduce a GroupIteratorsByKey and implement GroupByKey as a
transform that combines the iterator by concatenation, I think you do get
an internally consistent system. To execute efficiently, you need to always
identify and replace the GroupByKey operation with a primitive one. It does
make some sense to expose the weakest primitives for the sake of DSLs. But
they are very poorly suited for end-users, and for GBK on most runners you
get the more powerful one for free.

Kenn

On Mon, Sep 30, 2019 at 2:02 AM Jan Lukavský  wrote:

> > The fact that the annotation on the ParDo "changes" the GroupByKey
> implementation is very specific to the Spark runner implementation.
>
> I don't quite agree. It is not very specific to Spark, it is specific to
> generally all runners, that produce grouped elements in a way that is not
> reiterable. That is the key property. The example you gave with HDFS does
> not satisfy this condition (files on HDFS are certainly reiterable), and
> that's why no change to the GBK is needed (it actually already has the
> required property). A quick look at what FlinkRunner (at least non portable
> does) is that it implements GBK using reducing elements into List. That is
> going to crash on big PCollection, which is even nicely documented:
>
>* For internal use to translate {@link GroupByKey}. For a large {@link 
> PCollection} this is
>* expected to crash!
>
>
> If this is fixed, then it is likely to start behave the same as Spark. So
> actually I think the opposite is true - Dataflow is a special case, because
> of how its internal shuffle service works.
>
> > In general I sympathize with the worry about non-local effects. Beam is
> already full of them (e.g. a Window.into statement effects downstream
> GroupByKeys). In each case where they were added there was extensive debate
> and discussion (Windowing semantics were debated for many months), exactly
> because there was concern over adding these non-local effects. In every
> case, no other good solution could be found. For the case of windowing for
> example, it was often easy to propose simple local APIs (e.g. just pass the
> window fn as a parameter to GroupByKey), however all of these local
> solutions ended up not working for important use cases when we analyzed
> them more deeply.
>
> That is very interesting. Could you elaborate more about some examples of
> the use cases which didn't work? I'd like to try to match it against how
> Euphoria is structured, it should be more resistant to this non-local
> effects, because it very often bundles together multiple Beam's primitives
> to single transform - ReduceByKey is one example of this, if is actually
> mix of Window.into() + GBK + ParDo, Although it might look like if this
> transform can be broken down to something else, then it is not primitive
> (euphoria has no native equivalent of GBK itself), but it has several other
> nice implications - that is that Combine now becomes a special case of RBK.
> It now becomes only a question of where and how you can "run" the reduce
> function. The logic is absolutely equal. This can be worked in more detail
> and actually show, that even Combine and RBK can be decribed by a more
> general stateful operation (ReduceStateByKey), and so finally Euphoria
> actually has only two really "primitive" operations - these are FlatMap
> (basically stateless ParDo) and RSBK. As I already mentioned on some other
> thread, when stateful ParDo would support merging windows, it can be shown
> that both Combine and GBK become special cases of this.
>
> > As you mentioned below, I do think it's perfectly reasonable for a DSL
> to impose its own semantics. Scio already does this - the raw Beam API is
> used by a DSL as a substrate, but the DSL does not need to blindly mirror
> the semantics of the raw Beam API - at least in my opinion!
>
> Sure, but currently, there is no way for DSL to "hook" into runner, so it
> has to use raw Beam SDK, and so this will fail in cases like this - where
> Beam actually has stronger guarantees than it is required by the DSL. It
> would be cool if we could find a way to do that - this pretty much aligns
> with another question raised on ML, about the possibility to override a
> default implementation of a PTransform for specific pipeline.
>
> Jan
>
>
> On 9/29/19 7:46 PM, Reuven Lax wrote:
>
> Jan,
>
> The fact that the annotation on the ParDo "changes" the GroupByKey
> implementation is very specific to the Spark runner implementation. You can
> imagine another runner that simply writes out 

Re: Multiple iterations after GroupByKey with SparkRunner

2019-09-30 Thread Jan Lukavský
> The fact that the annotation on the ParDo "changes" the GroupByKey 
implementation is very specific to the Spark runner implementation.


I don't quite agree. It is not very specific to Spark, it is specific to 
generally all runners, that produce grouped elements in a way that is 
not reiterable. That is the key property. The example you gave with HDFS 
does not satisfy this condition (files on HDFS are certainly 
reiterable), and that's why no change to the GBK is needed (it actually 
already has the required property). A quick look at what FlinkRunner (at 
least non portable does) is that it implements GBK using reducing 
elements into List. That is going to crash on big PCollection, which is 
even nicely documented:


* For internal use to translate {@link GroupByKey}. For a large {@link 
PCollection} this is
* expected to crash!

If this is fixed, then it is likely to start behave the same as Spark. 
So actually I think the opposite is true - Dataflow is a special case, 
because of how its internal shuffle service works.


 In general I sympathize with the worry about non-local effects. Beam 
is already full of them (e.g. a Window.into statement effects downstream 
GroupByKeys). In each case where they were added there was extensive 
debate and discussion (Windowing semantics were debated for many 
months), exactly because there was concern over adding these non-local 
effects. In every case, no other good solution could be found. For the 
case of windowing for example, it was often easy to propose simple local 
APIs (e.g. just pass the window fn as a parameter to GroupByKey), 
however all of these local solutions ended up not working for important 
use cases when we analyzed them more deeply.


That is very interesting. Could you elaborate more about some examples 
of the use cases which didn't work? I'd like to try to match it against 
how Euphoria is structured, it should be more resistant to this 
non-local effects, because it very often bundles together multiple 
Beam's primitives to single transform - ReduceByKey is one example of 
this, if is actually mix of Window.into() + GBK + ParDo, Although it 
might look like if this transform can be broken down to something else, 
then it is not primitive (euphoria has no native equivalent of GBK 
itself), but it has several other nice implications - that is that 
Combine now becomes a special case of RBK. It now becomes only a 
question of where and how you can "run" the reduce function. The logic 
is absolutely equal. This can be worked in more detail and actually 
show, that even Combine and RBK can be decribed by a more general 
stateful operation (ReduceStateByKey), and so finally Euphoria actually 
has only two really "primitive" operations - these are FlatMap 
(basically stateless ParDo) and RSBK. As I already mentioned on some 
other thread, when stateful ParDo would support merging windows, it can 
be shown that both Combine and GBK become special cases of this.


 As you mentioned below, I do think it's perfectly reasonable for a DSL 
to impose its own semantics. Scio already does this - the raw Beam API 
is used by a DSL as a substrate, but the DSL does not need to blindly 
mirror the semantics of the raw Beam API - at least in my opinion!


Sure, but currently, there is no way for DSL to "hook" into runner, so 
it has to use raw Beam SDK, and so this will fail in cases like this - 
where Beam actually has stronger guarantees than it is required by the 
DSL. It would be cool if we could find a way to do that - this pretty 
much aligns with another question raised on ML, about the possibility to 
override a default implementation of a PTransform for specific pipeline.


Jan


On 9/29/19 7:46 PM, Reuven Lax wrote:

Jan,

The fact that the annotation on the ParDo "changes" the GroupByKey 
implementation is very specific to the Spark runner implementation. 
You can imagine another runner that simply writes out files in HDFS to 
implement a GroupByKey - this GroupByKey implementation is agnostic 
whether the result will be reiterated or not; in this case it is very 
much the ParDo implementation that changes to implement a reiterable. 
vI think you don't like the fact that an annotation on the ParDo will 
have a non-local effect on the implementation of the GroupByKey 
upstream. However arguably the non-local effect is just a quirk of how 
the Spark runner is implemented - other runners might have a local effect.


In general I sympathize with the worry about non-local effects. Beam 
is already full of them (e.g. a Window.into statement effects 
downstream GroupByKeys). In each case where they were added there was 
extensive debate and discussion (Windowing semantics were debated for 
many months), exactly because there was concern over adding these 
non-local effects. In every case, no other good solution could be 
found. For the case of windowing for example, it was often easy to 
propose simple local APIs (e.g. just pass the window fn 

Re: Multiple iterations after GroupByKey with SparkRunner

2019-09-29 Thread Reuven Lax
Jan,

The fact that the annotation on the ParDo "changes" the GroupByKey
implementation is very specific to the Spark runner implementation. You can
imagine another runner that simply writes out files in HDFS to implement a
GroupByKey - this GroupByKey implementation is agnostic whether the result
will be reiterated or not; in this case it is very much the ParDo
implementation that changes to implement a reiterable. vI think you don't
like the fact that an annotation on the ParDo will have a non-local effect
on the implementation of the GroupByKey upstream. However arguably the
non-local effect is just a quirk of how the Spark runner is implemented -
other runners might have a local effect.

In general I sympathize with the worry about non-local effects. Beam is
already full of them (e.g. a Window.into statement effects downstream
GroupByKeys). In each case where they were added there was extensive debate
and discussion (Windowing semantics were debated for many months), exactly
because there was concern over adding these non-local effects. In every
case, no other good solution could be found. For the case of windowing for
example, it was often easy to propose simple local APIs (e.g. just pass the
window fn as a parameter to GroupByKey), however all of these local
solutions ended up not working for important use cases when we analyzed
them more deeply.

As you mentioned below, I do think it's perfectly reasonable for a DSL to
impose its own semantics. Scio already does this - the raw Beam API is used
by a DSL as a substrate, but the DSL does not need to blindly mirror the
semantics of the raw Beam API - at least in my opinion!

Reuven

On Sat, Sep 28, 2019 at 12:26 AM Jan Lukavský  wrote:

> I understand the concerns. Still, it looks a little like we want to be
> able to modify behavior of an object from inside a submodule - quite like
> if my subprogram would accept a Map interface, but internally I would say
> "hey, this is supposed to be a HashMap, please change it so". Because of
> how pipeline is constructed, we can do that, the question is if there
> really isn't a better solution.
>
> What I do not like about the proposed solution:
>
>  1) to specify that the grouped elements are supposed to be iterated only
> once can be done only on ParDo, although there are other (higher level)
> PTransforms, that can consume output of GBK
>
>  2) the annontation on ParDo is by definition generic - i.e. can be used
> on input which is not output of GBK, which makes no sense
>
>  3) we modify the behavior to unlock some optimizations (or change of
> behavior of the GBK itself), users will not understand that
>
>  4) the annotation somewhat arbitrarily modifies data types passed, that
> is counter-intuitive and will be source of confusion
>
> I think that a solution that solves the above problems (but brings somoe
> new, as always :-)), could be to change the output of GBK from
> PCollection> to GroupedPCollection. That way we can
> control which operators (and how) consume the grouping, and we can enable
> these transforms to specify additional parameters (like how they want to
> consume the grouping). It is obviously a breaking change (although can be
> probably made backwards compatible) and it would very much likely involve a
> substantial work. But maybe there are some other not yet discussed options.
>
> Jan
> On 9/28/19 6:46 AM, Reuven Lax wrote:
>
> In many cases, the writer of the ParDo has no access to the GBK (e.g. the
> GBK is hidden inside an upstream PTransform that they cannot modify). This
> is the same reason why RequiresStableInput was made a property of the
> ParDo, because the GroupByKey is quite often inaccessible.
>
> The car analogy doesn't quite apply here, because the runner does have a
> full view of the pipeline so can satisfy all constraints. The car
> dealership generally cannot read your mind (thankfully!), so you have to
> specify what you want. Or to put it another way, the various transforms in
> a Beam pipeline do not live in isolation. The full pipeline graph is what
> is executed, and the runner already has to analyze the full graph to run
> the pipeline (as well as to optimize the pipeline).
>
> Reuven
>
> On Fri, Sep 27, 2019 at 2:35 PM Jan Lukavský  wrote:
>
>> I'd suggest Stream instead of Iterator, it has the same semantics and
>> much better API.
>>
>> Still not sure, what is wrong on letting the GBK to decide this. I have
>> an analogy - if I decide to buy a car, I have to decide *what* car I'm
>> going to buy (by think about how I'm going to use it) *before* I buy it. I
>> cannot just buy "a car" and then change it from minivan to sport car based
>> on my actual need. Same with the GBK - if I want to be able to reiterate
>> the result, then I should tell it in advance.
>>
>> Jan
>> On 9/27/19 10:50 PM, Kenneth Knowles wrote:
>>
>> Good point about sibling fusion requiring this.
>>
>> The type PTransform, KV>> already does imply that
>> the output iterable can be iterated 

RE: Multiple iterations after GroupByKey with SparkRunner

2019-09-29 Thread Gershi, Noam
Hi,

Thanx for the reply.

So – re-iteration on grouped elements is a runner-dependent. Flink & DataFlow 
allows it, while  Spark isn’t.

Since we investigating  here the runners also, Does anyone have a list which 
runner allow\not-allow re-iteration?

Noam


From: [apache.org] Kenneth Knowles 
Sent: Friday, September 27, 2019 7:26 PM
To: dev
Cc: user
Subject: Re: Multiple iterations after GroupByKey with SparkRunner

I am pretty surprised that we do not have a @Category(ValidatesRunner) test in 
GroupByKeyTest that iterates multiple times. That is a major oversight. We 
should have this test, and it can be disabled by the SparkRunner's 
configuration.

Kenn

On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax 
mailto:re...@google.com>> wrote:
The Dataflow version does not spill to disk. However Spark's design might 
require spilling to disk if you want that to be implemented properly.

On Fri, Sep 27, 2019 at 9:08 AM David Morávek 
mailto:d...@apache.org>> wrote:
Hi,

Spark's GBK is currently implemented using `sortBy(key and 
value).mapPartition(...)` for non-merging windowing in order to support large 
keys and large scale shuffles. Merging windowing is implemented using standard 
GBK (underlying spark impl. uses ListCombiner + Hash Grouping), which is by 
design unable to support large keys.

As Jan noted, problem with mapPartition is, that its UDF receives an Iterator. 
Only option here is to wrap this iterator to one that spills to disk once an 
internal buffer is exceeded (the approach suggested by Reuven). This 
unfortunately comes with a cost in some cases. The best approach would be to 
somehow determine, that user wants multiple iterations and than wrap it in 
"re-iterator" if necessary. Does anyone have any ideas how to approach this?

D.

On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax 
mailto:re...@google.com>> wrote:
The Beam API was written to support multiple iterations, and there are 
definitely transforms that do so. I believe that CoGroupByKey may do this as 
well with the resulting iterator.

I know that the Dataflow runner is able to handles iterators larger than 
available memory by paging them in from shuffle, which still allows for 
reiterating. It sounds like Spark is less flexible here?

Reuven

On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský 
mailto:je...@seznam.cz>> wrote:

+dev <mailto:dev@beam.apache.org>

Lukasz, why do you think that users expect to be able to iterate multiple times 
grouped elements? Besides that it obviously suggests the 'Iterable'? The way 
that spark behaves is pretty much analogous to how MapReduce used to work - in 
certain cases it calles repartitionAndSortWithinPartitions and then does 
mapPartition, which accepts Iterator - that is because internally it merge 
sorts pre sorted segments. This approach enables to GroupByKey data sets that 
are too big to fit into memory (per key).

If multiple iterations should be expected by users, we probably should:

 a) include that in @ValidatesRunner tests

 b) store values in memory on spark, which will break for certain pipelines

Because of (b) I think that it would be much better to remove this 
"expectation" and clearly document that the Iterable is not supposed to be 
iterated multiple times.

Jan
On 9/27/19 9:27 AM, Jan Lukavský wrote:

I pretty much think so, because that is how Spark works. The Iterable inside is 
really an Iterator, which cannot be iterated multiple times.

Jan
On 9/27/19 2:00 AM, Lukasz Cwik wrote:
Jan, in Beam users expect to be able to iterate the GBK output multiple times 
even from within the same ParDo.
Is this something that Beam on Spark Runner never supported?

On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský 
mailto:je...@seznam.cz>> wrote:

Hi Gershi,

could you please outline the pipeline you are trying to execute? Basically, you 
cannot iterate the Iterable multiple times in single ParDo. It should be 
possible, though, to apply multiple ParDos to output from GroupByKey.

Jan
On 9/26/19 3:32 PM, Gershi, Noam wrote:
Hi,

I want to iterate multiple times on the Iterable (the output of GroupByKey 
transformation)
When my Runner is SparkRunner, I get an exception:

Caused by: java.lang.IllegalStateException: ValueIterator can't be iterated 
more than once,otherwise there could be data lost
at 
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java<https://urldefense.proofpoint.com/v2/url?u=http-3A__GroupNonMergingWindowsFunctions.java=DwQFaQ=j-EkbjBYwkAB4f8ZbVn1Fw=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM=fQWOlVJ0K7fMH1V-V2wbcwjQBpBYjRv-8EtoFNYcAZU=Ow9AGQZRtbMkhnvxbK4yzNOsKXIBsbO9l-MBvYh_PDs=>:221)
at 
java.lang.Iterable.spliterator(Iterable.java<https://urldefense.proofpoint.com/v2/url?u=http-3A__Iterable.java=DwQFaQ=j-EkbjBYwkAB4f8ZbVn1Fw=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM=fQWOlV

Re: Multiple iterations after GroupByKey with SparkRunner

2019-09-28 Thread Jan Lukavský
Thinking about it a little more - maybe there could be another 
systematic approach to described problems. We (currently) have multiple 
layers of SDK/DSLs. It is imaginable, that each layer can have somewhat 
different requirements and guarantees - actually, this might be even 
logical, as each "layer" should serve a somewhat different purpose. I 
was always thinking about it, as it should be so that SDK should be the 
most generic, highest level of abstraction. That is implied by that DSLs 
should be expandable into "primitive" transforms from SDK.


On the other hand, it is absolutely imaginable, that SDK and DSLs be 
somewhat complementary. It would require DSLs to provide default 
expansions, but to override them in runners that do not satisfy the 
requirements (Spark runner in this case).


What I mean by that - in Euphoria, we have a concept called ReduceByKey, 
which is a essentially a chain of GroupByKey and ParDo. It is already 
written to use java.util.stream.Stream, therefore cannot confuse users 
regarding multiple iterations. What would be needed is to directly 
override this PTransform in SparkRunner. This override would then use 
the specific optimization, that is currently responsible for not being 
able to reiterate result from GBK. It would mean, that if user wants to 
reiterate, or is okay with using the default Spark implementation, which 
stores grouped elements in memory, it could use raw SDK (GBK + ParDo). 
If user wanted to make use of the optimization for large keys, he would 
have to use Euphoria's RBK. It would be change to pipeline, but pretty 
much a drop-in replacement of chained GBK + ParDo. I think this solves 
all the issues and can be easily documented and explained to users.


It would be even possible to enable runners to provide interface for 
injecting these overrides for specific DSLs, which would be cool, 
because the PTransform definition and associated runner override could 
be located outside of core and the runner itself. By that we could even 
solve another issue, that different DSLs can have different guarantees 
regarding various properties of stream processing - to mention one of 
the most obvious, sorting by timestamp in stateful processing, although 
supporting for that in core SDK probably would not be that much intrusive.


Jan

On 9/28/19 9:25 AM, Jan Lukavský wrote:


I understand the concerns. Still, it looks a little like we want to be 
able to modify behavior of an object from inside a submodule - quite 
like if my subprogram would accept a Map interface, but internally I 
would say "hey, this is supposed to be a HashMap, please change it 
so". Because of how pipeline is constructed, we can do that, the 
question is if there really isn't a better solution.


What I do not like about the proposed solution:

 1) to specify that the grouped elements are supposed to be iterated 
only once can be done only on ParDo, although there are other (higher 
level) PTransforms, that can consume output of GBK


 2) the annontation on ParDo is by definition generic - i.e. can be 
used on input which is not output of GBK, which makes no sense


 3) we modify the behavior to unlock some optimizations (or change of 
behavior of the GBK itself), users will not understand that


 4) the annotation somewhat arbitrarily modifies data types passed, 
that is counter-intuitive and will be source of confusion


I think that a solution that solves the above problems (but brings 
somoe new, as always :-)), could be to change the output of GBK from 
PCollection> to GroupedPCollection. That way we 
can control which operators (and how) consume the grouping, and we can 
enable these transforms to specify additional parameters (like how 
they want to consume the grouping). It is obviously a breaking change 
(although can be probably made backwards compatible) and it would very 
much likely involve a substantial work. But maybe there are some other 
not yet discussed options.


Jan

On 9/28/19 6:46 AM, Reuven Lax wrote:
In many cases, the writer of the ParDo has no access to the GBK (e.g. 
the GBK is hidden inside an upstream PTransform that they cannot 
modify). This is the same reason why RequiresStableInput was made a 
property of the ParDo, because the GroupByKey is quite often 
inaccessible.


The car analogy doesn't quite apply here, because the runner does 
have a full view of the pipeline so can satisfy all constraints. The 
car dealership generally cannot read your mind (thankfully!), so you 
have to specify what you want. Or to put it another way, the various 
transforms in a Beam pipeline do not live in isolation. The full 
pipeline graph is what is executed, and the runner already has to 
analyze the full graph to run the pipeline (as well as to optimize 
the pipeline).


Reuven

On Fri, Sep 27, 2019 at 2:35 PM Jan Lukavský > wrote:


I'd suggest Stream instead of Iterator, it has the same semantics
and much better API.

Still not 

Re: Multiple iterations after GroupByKey with SparkRunner

2019-09-28 Thread Jan Lukavský
I understand the concerns. Still, it looks a little like we want to be 
able to modify behavior of an object from inside a submodule - quite 
like if my subprogram would accept a Map interface, but internally I 
would say "hey, this is supposed to be a HashMap, please change it so". 
Because of how pipeline is constructed, we can do that, the question is 
if there really isn't a better solution.


What I do not like about the proposed solution:

 1) to specify that the grouped elements are supposed to be iterated 
only once can be done only on ParDo, although there are other (higher 
level) PTransforms, that can consume output of GBK


 2) the annontation on ParDo is by definition generic - i.e. can be 
used on input which is not output of GBK, which makes no sense


 3) we modify the behavior to unlock some optimizations (or change of 
behavior of the GBK itself), users will not understand that


 4) the annotation somewhat arbitrarily modifies data types passed, 
that is counter-intuitive and will be source of confusion


I think that a solution that solves the above problems (but brings somoe 
new, as always :-)), could be to change the output of GBK from 
PCollection> to GroupedPCollection. That way we can 
control which operators (and how) consume the grouping, and we can 
enable these transforms to specify additional parameters (like how they 
want to consume the grouping). It is obviously a breaking change 
(although can be probably made backwards compatible) and it would very 
much likely involve a substantial work. But maybe there are some other 
not yet discussed options.


Jan

On 9/28/19 6:46 AM, Reuven Lax wrote:
In many cases, the writer of the ParDo has no access to the GBK (e.g. 
the GBK is hidden inside an upstream PTransform that they cannot 
modify). This is the same reason why RequiresStableInput was made a 
property of the ParDo, because the GroupByKey is quite often 
inaccessible.


The car analogy doesn't quite apply here, because the runner does have 
a full view of the pipeline so can satisfy all constraints. The car 
dealership generally cannot read your mind (thankfully!), so you 
have to specify what you want. Or to put it another way, the various 
transforms in a Beam pipeline do not live in isolation. The full 
pipeline graph is what is executed, and the runner already has to 
analyze the full graph to run the pipeline (as well as to optimize the 
pipeline).


Reuven

On Fri, Sep 27, 2019 at 2:35 PM Jan Lukavský > wrote:


I'd suggest Stream instead of Iterator, it has the same semantics
and much better API.

Still not sure, what is wrong on letting the GBK to decide this. I
have an analogy - if I decide to buy a car, I have to decide
*what* car I'm going to buy (by think about how I'm going to use
it) *before* I buy it. I cannot just buy "a car" and then change
it from minivan to sport car based on my actual need. Same with
the GBK - if I want to be able to reiterate the result, then I
should tell it in advance.

Jan

On 9/27/19 10:50 PM, Kenneth Knowles wrote:

Good point about sibling fusion requiring this.

The type PTransform, KV>> already does
imply that the output iterable can be iterated arbitrarily many
times.

I think this should remain the default for all the reasons mentioned.

We could have opt-in to the weaker KV> version.
Agree that this is a property of the ParDo. A particular use of a
GBK has no idea what is downstream. If you owned the whole
pipeline, a special ParDo, Foo> would work. But to
make the types line up, this would require changes upstream,
which is not good.

Maybe something like this:

ParDo, Foo> {
  @ProcessElement
  void process(@OneShotIterator Iterator iter) {
    ...
  }
}

I've described all of this in terms of Java SDK. So we would need
a portable representation for all this metadata.

Kenn

On Fri, Sep 27, 2019 at 12:13 PM Reuven Lax mailto:re...@google.com>> wrote:

I think the behavior to make explicit is the need to
reiterate, not the need to handle large results. How large of
a result can be handled will always be dependent on the
runner, and each runner will probably have a different
definition of large keys. Reiteration however is a logical
difference in the programming API. Therefore I think it makes
sense to specify the latter. The need to reiterate is a
property of the downstream ParDo, so it should be specified
there - not on the GBK.

Reuven

On Fri, Sep 27, 2019 at 12:05 PM Jan Lukavský
mailto:je...@seznam.cz>> wrote:

Ok, I think I understand there might be some benefits of
this. Then I'd propose we make this clear on the GBK. If
we would support somehing like this:

 PCollection input = ;

 

Re: Multiple iterations after GroupByKey with SparkRunner

2019-09-27 Thread Reuven Lax
In many cases, the writer of the ParDo has no access to the GBK (e.g. the
GBK is hidden inside an upstream PTransform that they cannot modify). This
is the same reason why RequiresStableInput was made a property of the
ParDo, because the GroupByKey is quite often inaccessible.

The car analogy doesn't quite apply here, because the runner does have a
full view of the pipeline so can satisfy all constraints. The car
dealership generally cannot read your mind (thankfully!), so you have to
specify what you want. Or to put it another way, the various transforms in
a Beam pipeline do not live in isolation. The full pipeline graph is what
is executed, and the runner already has to analyze the full graph to run
the pipeline (as well as to optimize the pipeline).

Reuven

On Fri, Sep 27, 2019 at 2:35 PM Jan Lukavský  wrote:

> I'd suggest Stream instead of Iterator, it has the same semantics and much
> better API.
>
> Still not sure, what is wrong on letting the GBK to decide this. I have an
> analogy - if I decide to buy a car, I have to decide *what* car I'm going
> to buy (by think about how I'm going to use it) *before* I buy it. I cannot
> just buy "a car" and then change it from minivan to sport car based on my
> actual need. Same with the GBK - if I want to be able to reiterate the
> result, then I should tell it in advance.
>
> Jan
> On 9/27/19 10:50 PM, Kenneth Knowles wrote:
>
> Good point about sibling fusion requiring this.
>
> The type PTransform, KV>> already does imply that
> the output iterable can be iterated arbitrarily many times.
>
> I think this should remain the default for all the reasons mentioned.
>
> We could have opt-in to the weaker KV> version. Agree that
> this is a property of the ParDo. A particular use of a GBK has no idea what
> is downstream. If you owned the whole pipeline, a special
> ParDo, Foo> would work. But to make the types line up, this
> would require changes upstream, which is not good.
>
> Maybe something like this:
>
> ParDo, Foo> {
>   @ProcessElement
>   void process(@OneShotIterator Iterator iter) {
> ...
>   }
> }
>
> I've described all of this in terms of Java SDK. So we would need a
> portable representation for all this metadata.
>
> Kenn
>
> On Fri, Sep 27, 2019 at 12:13 PM Reuven Lax  wrote:
>
>> I think the behavior to make explicit is the need to reiterate, not the
>> need to handle large results. How large of a result can be handled will
>> always be dependent on the runner, and each runner will probably have a
>> different definition of large keys. Reiteration however is a logical
>> difference in the programming API. Therefore I think it makes sense to
>> specify the latter. The need to reiterate is a property of the downstream
>> ParDo, so it should be specified there - not on the GBK.
>>
>> Reuven
>>
>> On Fri, Sep 27, 2019 at 12:05 PM Jan Lukavský  wrote:
>>
>>> Ok, I think I understand there might be some benefits of this. Then I'd
>>> propose we make this clear on the GBK. If we would support somehing like
>>> this:
>>>
>>>  PCollection input = ;
>>>
>>>  input.apply(GroupByKey.withLargeKeys());
>>>
>>> then SparkRunner could expand this to repartitionAndSortWithinPartitions
>>> only on this PTransform, and fallback to the default (in memory) in other
>>> situations. The default expansion of LargeGroupByKey (let's say) would be
>>> classic GBK, so that only runners that need to make sure that they don't
>>> break reiterations can expand this.
>>>
>>> WDYT?
>>>
>>> Jan
>>> On 9/27/19 8:56 PM, Reuven Lax wrote:
>>>
>>> As I mentioned above, CoGroupByKey already takes advantage of this.
>>> Reiterating is not the most common use case, but it's definitely one that
>>> comes up. Also keep in mind that this API has supported reiterating for the
>>> past five years (since even before the SDK was donated to Apache).
>>> Therefore you should assume that users are relying on it, in ways we might
>>> not expect.
>>>
>>> Historically, Google's Flume system had collections that did not support
>>> reiterating (they were even called OneShotCollections to make it clear).
>>> This was the source of problems and user frustration, which was one reason
>>> that in the original Dataflow SDK we made sure that these iterables could
>>> be reiterated. Another reason why it's advantageous for a runner to support
>>> this is allowing for better fusion. If two ParDos A and B both read from
>>> the same GroupByKey, it is nice to be able to fuse them into one logical
>>> operator. For this, you'll probably need a shuffle implementation that
>>> allows two independent readers from the same shuffle session.
>>>
>>> How easy it is to implement reiterables that don't have to fit in memory
>>> will depend on the runner.  For Dataflow it's possible because the shuffle
>>> session is logically persistent, so the runner can simply reread the
>>> shuffle session. For other runners with different shuffle implementations,
>>> it might be harder to support both properties. Maybe 

Re: Multiple iterations after GroupByKey with SparkRunner

2019-09-27 Thread Jan Lukavský
I'd suggest Stream instead of Iterator, it has the same semantics and 
much better API.


Still not sure, what is wrong on letting the GBK to decide this. I have 
an analogy - if I decide to buy a car, I have to decide *what* car I'm 
going to buy (by think about how I'm going to use it) *before* I buy it. 
I cannot just buy "a car" and then change it from minivan to sport car 
based on my actual need. Same with the GBK - if I want to be able to 
reiterate the result, then I should tell it in advance.


Jan

On 9/27/19 10:50 PM, Kenneth Knowles wrote:

Good point about sibling fusion requiring this.

The type PTransform, KV>> already does imply 
that the output iterable can be iterated arbitrarily many times.


I think this should remain the default for all the reasons mentioned.

We could have opt-in to the weaker KV> version. Agree 
that this is a property of the ParDo. A particular use of a GBK has no 
idea what is downstream. If you owned the whole pipeline, a special 
ParDo, Foo> would work. But to make the types line up, 
this would require changes upstream, which is not good.


Maybe something like this:

ParDo, Foo> {
  @ProcessElement
  void process(@OneShotIterator Iterator iter) {
    ...
  }
}

I've described all of this in terms of Java SDK. So we would need a 
portable representation for all this metadata.


Kenn

On Fri, Sep 27, 2019 at 12:13 PM Reuven Lax > wrote:


I think the behavior to make explicit is the need to reiterate,
not the need to handle large results. How large of a result can be
handled will always be dependent on the runner, and each runner
will probably have a different definition of large keys.
Reiteration however is a logical difference in the programming
API. Therefore I think it makes sense to specify the latter. The
need to reiterate is a property of the downstream ParDo, so it
should be specified there - not on the GBK.

Reuven

On Fri, Sep 27, 2019 at 12:05 PM Jan Lukavský mailto:je...@seznam.cz>> wrote:

Ok, I think I understand there might be some benefits of this.
Then I'd propose we make this clear on the GBK. If we would
support somehing like this:

 PCollection input = ;

 input.apply(GroupByKey.withLargeKeys());

then SparkRunner could expand this to
repartitionAndSortWithinPartitions only on this PTransform,
and fallback to the default (in memory) in other situations.
The default expansion of LargeGroupByKey (let's say) would be
classic GBK, so that only runners that need to make sure that
they don't break reiterations can expand this.

WDYT?

Jan

On 9/27/19 8:56 PM, Reuven Lax wrote:

As I mentioned above, CoGroupByKey already takes advantage of
this. Reiterating is not the most common use case, but it's
definitely one that comes up. Also keep in mind that this API
has supported reiterating for the past five years (since even
before the SDK was donated to Apache). Therefore you should
assume that users are relying on it, in ways we might not
expect.

Historically, Google's Flume system had collections that did
not support reiterating (they were even called
OneShotCollections to make it clear). This was the source of
problems and user frustration, which was one reason that in
the original Dataflow SDK we made sure that these iterables
could be reiterated. Another reason why it's advantageous for
a runner to support this is allowing for better fusion. If
two ParDos A and B both read from the same GroupByKey, it is
nice to be able to fuse them into one logical operator. For
this, you'll probably need a shuffle implementation that
allows two independent readers from the same shuffle session.

How easy it is to implement reiterables that don't have to
fit in memory will depend on the runner.  For Dataflow it's
possible because the shuffle session is logically persistent,
so the runner can simply reread the shuffle session. For
other runners with different shuffle implementations, it
might be harder to support both properties. Maybe we should
introduce a new @RequiresReiteration annotation on ParDo?
That way the Spark runner can see this and switch to the
in-memory version just for groupings consumed by those
ParDos. Runners that already support reiteration can ignore
this annotation, so it should be backwards compatible.

Reuven

On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský
mailto:je...@seznam.cz>> wrote:

I'd like to know the use-case. Why would you *need* to
actually iterate the grouped elements twice? By
definition the first iteration would have to extract some
statistic (or 

Re: Multiple iterations after GroupByKey with SparkRunner

2019-09-27 Thread Kenneth Knowles
Good point about sibling fusion requiring this.

The type PTransform, KV>> already does imply that
the output iterable can be iterated arbitrarily many times.

I think this should remain the default for all the reasons mentioned.

We could have opt-in to the weaker KV> version. Agree that
this is a property of the ParDo. A particular use of a GBK has no idea what
is downstream. If you owned the whole pipeline, a special
ParDo, Foo> would work. But to make the types line up, this
would require changes upstream, which is not good.

Maybe something like this:

ParDo, Foo> {
  @ProcessElement
  void process(@OneShotIterator Iterator iter) {
...
  }
}

I've described all of this in terms of Java SDK. So we would need a
portable representation for all this metadata.

Kenn

On Fri, Sep 27, 2019 at 12:13 PM Reuven Lax  wrote:

> I think the behavior to make explicit is the need to reiterate, not the
> need to handle large results. How large of a result can be handled will
> always be dependent on the runner, and each runner will probably have a
> different definition of large keys. Reiteration however is a logical
> difference in the programming API. Therefore I think it makes sense to
> specify the latter. The need to reiterate is a property of the downstream
> ParDo, so it should be specified there - not on the GBK.
>
> Reuven
>
> On Fri, Sep 27, 2019 at 12:05 PM Jan Lukavský  wrote:
>
>> Ok, I think I understand there might be some benefits of this. Then I'd
>> propose we make this clear on the GBK. If we would support somehing like
>> this:
>>
>>  PCollection input = ;
>>
>>  input.apply(GroupByKey.withLargeKeys());
>>
>> then SparkRunner could expand this to repartitionAndSortWithinPartitions
>> only on this PTransform, and fallback to the default (in memory) in other
>> situations. The default expansion of LargeGroupByKey (let's say) would be
>> classic GBK, so that only runners that need to make sure that they don't
>> break reiterations can expand this.
>>
>> WDYT?
>>
>> Jan
>> On 9/27/19 8:56 PM, Reuven Lax wrote:
>>
>> As I mentioned above, CoGroupByKey already takes advantage of this.
>> Reiterating is not the most common use case, but it's definitely one that
>> comes up. Also keep in mind that this API has supported reiterating for the
>> past five years (since even before the SDK was donated to Apache).
>> Therefore you should assume that users are relying on it, in ways we might
>> not expect.
>>
>> Historically, Google's Flume system had collections that did not support
>> reiterating (they were even called OneShotCollections to make it clear).
>> This was the source of problems and user frustration, which was one reason
>> that in the original Dataflow SDK we made sure that these iterables could
>> be reiterated. Another reason why it's advantageous for a runner to support
>> this is allowing for better fusion. If two ParDos A and B both read from
>> the same GroupByKey, it is nice to be able to fuse them into one logical
>> operator. For this, you'll probably need a shuffle implementation that
>> allows two independent readers from the same shuffle session.
>>
>> How easy it is to implement reiterables that don't have to fit in memory
>> will depend on the runner.  For Dataflow it's possible because the shuffle
>> session is logically persistent, so the runner can simply reread the
>> shuffle session. For other runners with different shuffle implementations,
>> it might be harder to support both properties. Maybe we should introduce a
>> new @RequiresReiteration annotation on ParDo? That way the Spark runner can
>> see this and switch to the in-memory version just for groupings consumed by
>> those ParDos. Runners that already support reiteration can ignore this
>> annotation, so it should be backwards compatible.
>>
>> Reuven
>>
>> On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský  wrote:
>>
>>> I'd like to know the use-case. Why would you *need* to actually iterate
>>> the grouped elements twice? By definition the first iteration would have to
>>> extract some statistic (or subset of elements that must fit into memory).
>>> This statistic can then be used as another input for the second iteration.
>>> Why not then calculate the statistic in a separate branch in the pipeline
>>> and feed it then into the ParDo as side input? That would be definitely
>>> more efficient, because the calculation of the statistic would be probably
>>> combinable (not sure if it is absolutely required to be combinable, but it
>>> seems probable). Even if the calculation would not be combinable, it is not
>>> less efficient than reiterating twice. Why then support multiple iterations
>>> (besides the fact that output of GBK is Iterable). Am I missing something?
>>>
>>> Jan
>>> On 9/27/19 6:36 PM, Reuven Lax wrote:
>>>
>>> This should be an element in the compatibility matrix as well.
>>>
>>> On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles  wrote:
>>>
 I am pretty surprised that we do not have a 

Re: Multiple iterations after GroupByKey with SparkRunner

2019-09-27 Thread Reuven Lax
I think the behavior to make explicit is the need to reiterate, not the
need to handle large results. How large of a result can be handled will
always be dependent on the runner, and each runner will probably have a
different definition of large keys. Reiteration however is a logical
difference in the programming API. Therefore I think it makes sense to
specify the latter. The need to reiterate is a property of the downstream
ParDo, so it should be specified there - not on the GBK.

Reuven

On Fri, Sep 27, 2019 at 12:05 PM Jan Lukavský  wrote:

> Ok, I think I understand there might be some benefits of this. Then I'd
> propose we make this clear on the GBK. If we would support somehing like
> this:
>
>  PCollection input = ;
>
>  input.apply(GroupByKey.withLargeKeys());
>
> then SparkRunner could expand this to repartitionAndSortWithinPartitions
> only on this PTransform, and fallback to the default (in memory) in other
> situations. The default expansion of LargeGroupByKey (let's say) would be
> classic GBK, so that only runners that need to make sure that they don't
> break reiterations can expand this.
>
> WDYT?
>
> Jan
> On 9/27/19 8:56 PM, Reuven Lax wrote:
>
> As I mentioned above, CoGroupByKey already takes advantage of this.
> Reiterating is not the most common use case, but it's definitely one that
> comes up. Also keep in mind that this API has supported reiterating for the
> past five years (since even before the SDK was donated to Apache).
> Therefore you should assume that users are relying on it, in ways we might
> not expect.
>
> Historically, Google's Flume system had collections that did not support
> reiterating (they were even called OneShotCollections to make it clear).
> This was the source of problems and user frustration, which was one reason
> that in the original Dataflow SDK we made sure that these iterables could
> be reiterated. Another reason why it's advantageous for a runner to support
> this is allowing for better fusion. If two ParDos A and B both read from
> the same GroupByKey, it is nice to be able to fuse them into one logical
> operator. For this, you'll probably need a shuffle implementation that
> allows two independent readers from the same shuffle session.
>
> How easy it is to implement reiterables that don't have to fit in memory
> will depend on the runner.  For Dataflow it's possible because the shuffle
> session is logically persistent, so the runner can simply reread the
> shuffle session. For other runners with different shuffle implementations,
> it might be harder to support both properties. Maybe we should introduce a
> new @RequiresReiteration annotation on ParDo? That way the Spark runner can
> see this and switch to the in-memory version just for groupings consumed by
> those ParDos. Runners that already support reiteration can ignore this
> annotation, so it should be backwards compatible.
>
> Reuven
>
> On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský  wrote:
>
>> I'd like to know the use-case. Why would you *need* to actually iterate
>> the grouped elements twice? By definition the first iteration would have to
>> extract some statistic (or subset of elements that must fit into memory).
>> This statistic can then be used as another input for the second iteration.
>> Why not then calculate the statistic in a separate branch in the pipeline
>> and feed it then into the ParDo as side input? That would be definitely
>> more efficient, because the calculation of the statistic would be probably
>> combinable (not sure if it is absolutely required to be combinable, but it
>> seems probable). Even if the calculation would not be combinable, it is not
>> less efficient than reiterating twice. Why then support multiple iterations
>> (besides the fact that output of GBK is Iterable). Am I missing something?
>>
>> Jan
>> On 9/27/19 6:36 PM, Reuven Lax wrote:
>>
>> This should be an element in the compatibility matrix as well.
>>
>> On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles  wrote:
>>
>>> I am pretty surprised that we do not have a @Category(ValidatesRunner)
>>> test in GroupByKeyTest that iterates multiple times. That is a major
>>> oversight. We should have this test, and it can be disabled by the
>>> SparkRunner's configuration.
>>>
>>> Kenn
>>>
>>> On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax  wrote:
>>>
 The Dataflow version does not spill to disk. However Spark's design
 might require spilling to disk if you want that to be implemented properly.

 On Fri, Sep 27, 2019 at 9:08 AM David Morávek  wrote:

> Hi,
>
> Spark's GBK is currently implemented using `sortBy(key and
> value).mapPartition(...)` for non-merging windowing in order to support
> large keys and large scale shuffles. Merging windowing is implemented 
> using
> standard GBK (underlying spark impl. uses ListCombiner + Hash Grouping),
> which is by design unable to support large keys.
>
> As Jan noted, problem with mapPartition is, 

Re: Multiple iterations after GroupByKey with SparkRunner

2019-09-27 Thread Jan Lukavský
Ok, I think I understand there might be some benefits of this. Then I'd 
propose we make this clear on the GBK. If we would support somehing like 
this:


 PCollection input = ;

 input.apply(GroupByKey.withLargeKeys());

then SparkRunner could expand this to repartitionAndSortWithinPartitions 
only on this PTransform, and fallback to the default (in memory) in 
other situations. The default expansion of LargeGroupByKey (let's say) 
would be classic GBK, so that only runners that need to make sure that 
they don't break reiterations can expand this.


WDYT?

Jan

On 9/27/19 8:56 PM, Reuven Lax wrote:
As I mentioned above, CoGroupByKey already takes advantage of this. 
Reiterating is not the most common use case, but it's definitely one 
that comes up. Also keep in mind that this API has supported 
reiterating for the past five years (since even before the SDK was 
donated to Apache). Therefore you should assume that users are relying 
on it, in ways we might not expect.


Historically, Google's Flume system had collections that did not 
support reiterating (they were even called OneShotCollections to make 
it clear). This was the source of problems and user frustration, which 
was one reason that in the original Dataflow SDK we made sure that 
these iterables could be reiterated. Another reason why it's 
advantageous for a runner to support this is allowing for better 
fusion. If two ParDos A and B both read from the same GroupByKey, it 
is nice to be able to fuse them into one logical operator. For this, 
you'll probably need a shuffle implementation that allows two 
independent readers from the same shuffle session.


How easy it is to implement reiterables that don't have to fit in 
memory will depend on the runner.  For Dataflow it's possible because 
the shuffle session is logically persistent, so the runner can simply 
reread the shuffle session. For other runners with different shuffle 
implementations, it might be harder to support both properties. Maybe 
we should introduce a new @RequiresReiteration annotation on ParDo? 
That way the Spark runner can see this and switch to the in-memory 
version just for groupings consumed by those ParDos. Runners that 
already support reiteration can ignore this annotation, so it should 
be backwards compatible.


Reuven

On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský > wrote:


I'd like to know the use-case. Why would you *need* to actually
iterate the grouped elements twice? By definition the first
iteration would have to extract some statistic (or subset of
elements that must fit into memory). This statistic can then be
used as another input for the second iteration. Why not then
calculate the statistic in a separate branch in the pipeline and
feed it then into the ParDo as side input? That would be
definitely more efficient, because the calculation of the
statistic would be probably combinable (not sure if it is
absolutely required to be combinable, but it seems probable). Even
if the calculation would not be combinable, it is not less
efficient than reiterating twice. Why then support multiple
iterations (besides the fact that output of GBK is Iterable). Am I
missing something?

Jan

On 9/27/19 6:36 PM, Reuven Lax wrote:

This should be an element in the compatibility matrix as well.

On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles mailto:k...@apache.org>> wrote:

I am pretty surprised that we do not have
a @Category(ValidatesRunner) test in GroupByKeyTest that
iterates multiple times. That is a major oversight. We should
have this test, and it can be disabled by the SparkRunner's
configuration.

Kenn

On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax mailto:re...@google.com>> wrote:

The Dataflow version does not spill to disk. However
Spark's design might require spilling to disk if you want
that to be implemented properly.

On Fri, Sep 27, 2019 at 9:08 AM David Morávek
mailto:d...@apache.org>> wrote:

Hi,

Spark's GBK is currently implemented using
`sortBy(key and value).mapPartition(...)` for
non-merging windowing in order to support large keys
and large scale shuffles. Merging windowing is
implemented using standard GBK (underlying spark
impl. uses ListCombiner + Hash Grouping), which is by
design unable to support large keys.

As Jan noted, problem with mapPartition is, that its
UDF receives an Iterator. Only option here is to wrap
this iterator to one that spills to disk once an
internal buffer is exceeded (the approach suggested
by Reuven). This unfortunately comes with a cost in
some cases. The best approach would be to 

Re: Multiple iterations after GroupByKey with SparkRunner

2019-09-27 Thread Jan Lukavský

> But - does it imply that it is actually required O(n^2)

I meant O(n) iterations, O(n^2) operations on elements.

On 9/27/19 8:31 PM, Jan Lukavský wrote:


Okay, the self-join example is understandable. But - does it imply 
that it is actually required O(n^2) iterations (maybe caching can 
somehow help, but asymptotically, the complexity will be this)? If so, 
that seems to be very prohibitively slow (for large inputs that don't 
fit into memory), and actually materializing the cartesian product 
might help parallelize the process?


On 9/27/19 8:19 PM, Kenneth Knowles wrote:
CoGroupByKey is one example. To perform a CoGroupByKey based join 
requires multiple iterations (caching is key to getting performance). 
You could make up other calculations that require it, most of which 
would look like a self-join, like "output the largest difference 
between any two elements for each key". In both these examples, 
multiple iterations avoids materializing a cartesian product but 
allows more like a nested loop join.


On the philosophical side, an iterable behaves like a "value" (it has 
well-defined contents, a size, etc) whereas an iterator is more of a 
mutating process than a value. So you cannot easily reason about a 
transform "for input X has output Y" when the output is a construct 
that cannot be interpreted as a value at all.


Kenn

On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský > wrote:


I'd like to know the use-case. Why would you *need* to actually
iterate the grouped elements twice? By definition the first
iteration would have to extract some statistic (or subset of
elements that must fit into memory). This statistic can then be
used as another input for the second iteration. Why not then
calculate the statistic in a separate branch in the pipeline and
feed it then into the ParDo as side input? That would be
definitely more efficient, because the calculation of the
statistic would be probably combinable (not sure if it is
absolutely required to be combinable, but it seems probable).
Even if the calculation would not be combinable, it is not less
efficient than reiterating twice. Why then support multiple
iterations (besides the fact that output of GBK is Iterable). Am
I missing something?

Jan

On 9/27/19 6:36 PM, Reuven Lax wrote:

This should be an element in the compatibility matrix as well.

On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles mailto:k...@apache.org>> wrote:

I am pretty surprised that we do not have
a @Category(ValidatesRunner) test in GroupByKeyTest that
iterates multiple times. That is a major oversight. We
should have this test, and it can be disabled by the
SparkRunner's configuration.

Kenn

On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax mailto:re...@google.com>> wrote:

The Dataflow version does not spill to disk. However
Spark's design might require spilling to disk if you
want that to be implemented properly.

On Fri, Sep 27, 2019 at 9:08 AM David Morávek
mailto:d...@apache.org>> wrote:

Hi,

Spark's GBK is currently implemented using
`sortBy(key and value).mapPartition(...)` for
non-merging windowing in order to support large keys
and large scale shuffles. Merging windowing is
implemented using standard GBK (underlying spark
impl. uses ListCombiner + Hash Grouping), which is
by design unable to support large keys.

As Jan noted, problem with mapPartition is, that its
UDF receives an Iterator. Only option here is to
wrap this iterator to one that spills to disk once
an internal buffer is exceeded (the approach
suggested by Reuven). This unfortunately comes with
a cost in some cases. The best approach would be to
somehow determine, that user wants multiple
iterations and than wrap it in "re-iterator" if
necessary. Does anyone have any ideas how to
approach this?

D.

On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax
mailto:re...@google.com>> wrote:

The Beam API was written to support multiple
iterations, and there are definitely transforms
that do so. I believe that CoGroupByKey may do
this as well with the resulting iterator.

I know that the Dataflow runner is able to
handles iterators larger than available memory
by paging them in from shuffle, which still
allows for reiterating. It sounds like Spark is
less flexible here?

Reuven

  

Re: Multiple iterations after GroupByKey with SparkRunner

2019-09-27 Thread Reuven Lax
As I mentioned above, CoGroupByKey already takes advantage of this.
Reiterating is not the most common use case, but it's definitely one that
comes up. Also keep in mind that this API has supported reiterating for the
past five years (since even before the SDK was donated to Apache).
Therefore you should assume that users are relying on it, in ways we might
not expect.

Historically, Google's Flume system had collections that did not support
reiterating (they were even called OneShotCollections to make it clear).
This was the source of problems and user frustration, which was one reason
that in the original Dataflow SDK we made sure that these iterables could
be reiterated. Another reason why it's advantageous for a runner to support
this is allowing for better fusion. If two ParDos A and B both read from
the same GroupByKey, it is nice to be able to fuse them into one logical
operator. For this, you'll probably need a shuffle implementation that
allows two independent readers from the same shuffle session.

How easy it is to implement reiterables that don't have to fit in memory
will depend on the runner.  For Dataflow it's possible because the shuffle
session is logically persistent, so the runner can simply reread the
shuffle session. For other runners with different shuffle implementations,
it might be harder to support both properties. Maybe we should introduce a
new @RequiresReiteration annotation on ParDo? That way the Spark runner can
see this and switch to the in-memory version just for groupings consumed by
those ParDos. Runners that already support reiteration can ignore this
annotation, so it should be backwards compatible.

Reuven

On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský  wrote:

> I'd like to know the use-case. Why would you *need* to actually iterate
> the grouped elements twice? By definition the first iteration would have to
> extract some statistic (or subset of elements that must fit into memory).
> This statistic can then be used as another input for the second iteration.
> Why not then calculate the statistic in a separate branch in the pipeline
> and feed it then into the ParDo as side input? That would be definitely
> more efficient, because the calculation of the statistic would be probably
> combinable (not sure if it is absolutely required to be combinable, but it
> seems probable). Even if the calculation would not be combinable, it is not
> less efficient than reiterating twice. Why then support multiple iterations
> (besides the fact that output of GBK is Iterable). Am I missing something?
>
> Jan
> On 9/27/19 6:36 PM, Reuven Lax wrote:
>
> This should be an element in the compatibility matrix as well.
>
> On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles  wrote:
>
>> I am pretty surprised that we do not have a @Category(ValidatesRunner)
>> test in GroupByKeyTest that iterates multiple times. That is a major
>> oversight. We should have this test, and it can be disabled by the
>> SparkRunner's configuration.
>>
>> Kenn
>>
>> On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax  wrote:
>>
>>> The Dataflow version does not spill to disk. However Spark's design
>>> might require spilling to disk if you want that to be implemented properly.
>>>
>>> On Fri, Sep 27, 2019 at 9:08 AM David Morávek  wrote:
>>>
 Hi,

 Spark's GBK is currently implemented using `sortBy(key and
 value).mapPartition(...)` for non-merging windowing in order to support
 large keys and large scale shuffles. Merging windowing is implemented using
 standard GBK (underlying spark impl. uses ListCombiner + Hash Grouping),
 which is by design unable to support large keys.

 As Jan noted, problem with mapPartition is, that its UDF receives an
 Iterator. Only option here is to wrap this iterator to one that spills to
 disk once an internal buffer is exceeded (the approach suggested by
 Reuven). This unfortunately comes with a cost in some cases. The best
 approach would be to somehow determine, that user wants multiple iterations
 and than wrap it in "re-iterator" if necessary. Does anyone have any ideas
 how to approach this?

 D.

 On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax  wrote:

> The Beam API was written to support multiple iterations, and there are
> definitely transforms that do so. I believe that CoGroupByKey may do this
> as well with the resulting iterator.
>
> I know that the Dataflow runner is able to handles iterators larger
> than available memory by paging them in from shuffle, which still allows
> for reiterating. It sounds like Spark is less flexible here?
>
> Reuven
>
> On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský  wrote:
>
>> +dev  
>>
>> Lukasz, why do you think that users expect to be able to iterate
>> multiple times grouped elements? Besides that it obviously suggests the
>> 'Iterable'? The way that spark behaves is pretty much analogous to how
>> 

Re: Multiple iterations after GroupByKey with SparkRunner

2019-09-27 Thread Jan Lukavský
Okay, the self-join example is understandable. But - does it imply that 
it is actually required O(n^2) iterations (maybe caching can somehow 
help, but asymptotically, the complexity will be this)? If so, that 
seems to be very prohibitively slow (for large inputs that don't fit 
into memory), and actually materializing the cartesian product might 
help parallelize the process?


On 9/27/19 8:19 PM, Kenneth Knowles wrote:
CoGroupByKey is one example. To perform a CoGroupByKey based join 
requires multiple iterations (caching is key to getting performance). 
You could make up other calculations that require it, most of which 
would look like a self-join, like "output the largest difference 
between any two elements for each key". In both these examples, 
multiple iterations avoids materializing a cartesian product but 
allows more like a nested loop join.


On the philosophical side, an iterable behaves like a "value" (it has 
well-defined contents, a size, etc) whereas an iterator is more of a 
mutating process than a value. So you cannot easily reason about a 
transform "for input X has output Y" when the output is a construct 
that cannot be interpreted as a value at all.


Kenn

On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský > wrote:


I'd like to know the use-case. Why would you *need* to actually
iterate the grouped elements twice? By definition the first
iteration would have to extract some statistic (or subset of
elements that must fit into memory). This statistic can then be
used as another input for the second iteration. Why not then
calculate the statistic in a separate branch in the pipeline and
feed it then into the ParDo as side input? That would be
definitely more efficient, because the calculation of the
statistic would be probably combinable (not sure if it is
absolutely required to be combinable, but it seems probable). Even
if the calculation would not be combinable, it is not less
efficient than reiterating twice. Why then support multiple
iterations (besides the fact that output of GBK is Iterable). Am I
missing something?

Jan

On 9/27/19 6:36 PM, Reuven Lax wrote:

This should be an element in the compatibility matrix as well.

On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles mailto:k...@apache.org>> wrote:

I am pretty surprised that we do not have
a @Category(ValidatesRunner) test in GroupByKeyTest that
iterates multiple times. That is a major oversight. We should
have this test, and it can be disabled by the SparkRunner's
configuration.

Kenn

On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax mailto:re...@google.com>> wrote:

The Dataflow version does not spill to disk. However
Spark's design might require spilling to disk if you want
that to be implemented properly.

On Fri, Sep 27, 2019 at 9:08 AM David Morávek
mailto:d...@apache.org>> wrote:

Hi,

Spark's GBK is currently implemented using
`sortBy(key and value).mapPartition(...)` for
non-merging windowing in order to support large keys
and large scale shuffles. Merging windowing is
implemented using standard GBK (underlying spark
impl. uses ListCombiner + Hash Grouping), which is by
design unable to support large keys.

As Jan noted, problem with mapPartition is, that its
UDF receives an Iterator. Only option here is to wrap
this iterator to one that spills to disk once an
internal buffer is exceeded (the approach suggested
by Reuven). This unfortunately comes with a cost in
some cases. The best approach would be to somehow
determine, that user wants multiple iterations and
than wrap it in "re-iterator" if necessary. Does
anyone have any ideas how to approach this?

D.

On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax
mailto:re...@google.com>> wrote:

The Beam API was written to support multiple
iterations, and there are definitely transforms
that do so. I believe that CoGroupByKey may do
this as well with the resulting iterator.

I know that the Dataflow runner is able to
handles iterators larger than available memory by
paging them in from shuffle, which still allows
for reiterating. It sounds like Spark is less
flexible here?

Reuven

On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský
mailto:je...@seznam.cz>> wrote:

+dev 

Re: Multiple iterations after GroupByKey with SparkRunner

2019-09-27 Thread Kenneth Knowles
CoGroupByKey is one example. To perform a CoGroupByKey based join requires
multiple iterations (caching is key to getting performance). You could make
up other calculations that require it, most of which would look like a
self-join, like "output the largest difference between any two elements for
each key". In both these examples, multiple iterations avoids materializing
a cartesian product but allows more like a nested loop join.

On the philosophical side, an iterable behaves like a "value" (it has
well-defined contents, a size, etc) whereas an iterator is more of a
mutating process than a value. So you cannot easily reason about a
transform "for input X has output Y" when the output is a construct that
cannot be interpreted as a value at all.

Kenn

On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský  wrote:

> I'd like to know the use-case. Why would you *need* to actually iterate
> the grouped elements twice? By definition the first iteration would have to
> extract some statistic (or subset of elements that must fit into memory).
> This statistic can then be used as another input for the second iteration.
> Why not then calculate the statistic in a separate branch in the pipeline
> and feed it then into the ParDo as side input? That would be definitely
> more efficient, because the calculation of the statistic would be probably
> combinable (not sure if it is absolutely required to be combinable, but it
> seems probable). Even if the calculation would not be combinable, it is not
> less efficient than reiterating twice. Why then support multiple iterations
> (besides the fact that output of GBK is Iterable). Am I missing something?
>
> Jan
> On 9/27/19 6:36 PM, Reuven Lax wrote:
>
> This should be an element in the compatibility matrix as well.
>
> On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles  wrote:
>
>> I am pretty surprised that we do not have a @Category(ValidatesRunner)
>> test in GroupByKeyTest that iterates multiple times. That is a major
>> oversight. We should have this test, and it can be disabled by the
>> SparkRunner's configuration.
>>
>> Kenn
>>
>> On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax  wrote:
>>
>>> The Dataflow version does not spill to disk. However Spark's design
>>> might require spilling to disk if you want that to be implemented properly.
>>>
>>> On Fri, Sep 27, 2019 at 9:08 AM David Morávek  wrote:
>>>
 Hi,

 Spark's GBK is currently implemented using `sortBy(key and
 value).mapPartition(...)` for non-merging windowing in order to support
 large keys and large scale shuffles. Merging windowing is implemented using
 standard GBK (underlying spark impl. uses ListCombiner + Hash Grouping),
 which is by design unable to support large keys.

 As Jan noted, problem with mapPartition is, that its UDF receives an
 Iterator. Only option here is to wrap this iterator to one that spills to
 disk once an internal buffer is exceeded (the approach suggested by
 Reuven). This unfortunately comes with a cost in some cases. The best
 approach would be to somehow determine, that user wants multiple iterations
 and than wrap it in "re-iterator" if necessary. Does anyone have any ideas
 how to approach this?

 D.

 On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax  wrote:

> The Beam API was written to support multiple iterations, and there are
> definitely transforms that do so. I believe that CoGroupByKey may do this
> as well with the resulting iterator.
>
> I know that the Dataflow runner is able to handles iterators larger
> than available memory by paging them in from shuffle, which still allows
> for reiterating. It sounds like Spark is less flexible here?
>
> Reuven
>
> On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský  wrote:
>
>> +dev  
>>
>> Lukasz, why do you think that users expect to be able to iterate
>> multiple times grouped elements? Besides that it obviously suggests the
>> 'Iterable'? The way that spark behaves is pretty much analogous to how
>> MapReduce used to work - in certain cases it calles
>> repartitionAndSortWithinPartitions and then does mapPartition, which
>> accepts Iterator - that is because internally it merge sorts pre sorted
>> segments. This approach enables to GroupByKey data sets that are too big 
>> to
>> fit into memory (per key).
>>
>> If multiple iterations should be expected by users, we probably
>> should:
>>
>>  a) include that in @ValidatesRunner tests
>>
>>  b) store values in memory on spark, which will break for certain
>> pipelines
>>
>> Because of (b) I think that it would be much better to remove this
>> "expectation" and clearly document that the Iterable is not supposed to 
>> be
>> iterated multiple times.
>>
>> Jan
>> On 9/27/19 9:27 AM, Jan Lukavský wrote:
>>
>> I pretty much think so, because that is how Spark 

Re: Multiple iterations after GroupByKey with SparkRunner

2019-09-27 Thread Jan Lukavský
I'd like to know the use-case. Why would you *need* to actually iterate 
the grouped elements twice? By definition the first iteration would have 
to extract some statistic (or subset of elements that must fit into 
memory). This statistic can then be used as another input for the second 
iteration. Why not then calculate the statistic in a separate branch in 
the pipeline and feed it then into the ParDo as side input? That would 
be definitely more efficient, because the calculation of the statistic 
would be probably combinable (not sure if it is absolutely required to 
be combinable, but it seems probable). Even if the calculation would not 
be combinable, it is not less efficient than reiterating twice. Why then 
support multiple iterations (besides the fact that output of GBK is 
Iterable). Am I missing something?


Jan

On 9/27/19 6:36 PM, Reuven Lax wrote:

This should be an element in the compatibility matrix as well.

On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles > wrote:


I am pretty surprised that we do not have
a @Category(ValidatesRunner) test in GroupByKeyTest that iterates
multiple times. That is a major oversight. We should have this
test, and it can be disabled by the SparkRunner's configuration.

Kenn

On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax mailto:re...@google.com>> wrote:

The Dataflow version does not spill to disk. However Spark's
design might require spilling to disk if you want that to be
implemented properly.

On Fri, Sep 27, 2019 at 9:08 AM David Morávek mailto:d...@apache.org>> wrote:

Hi,

Spark's GBK is currently implemented using `sortBy(key and
value).mapPartition(...)` for non-merging windowing in
order to support large keys and large scale shuffles.
Merging windowing is implemented using standard GBK
(underlying spark impl. uses ListCombiner + Hash
Grouping), which is by design unable to support large keys.

As Jan noted, problem with mapPartition is, that its UDF
receives an Iterator. Only option here is to wrap this
iterator to one that spills to disk once an internal
buffer is exceeded (the approach suggested by Reuven).
This unfortunately comes with a cost in some cases. The
best approach would be to somehow determine, that user
wants multiple iterations and than wrap it in
"re-iterator" if necessary. Does anyone have any ideas how
to approach this?

D.

On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax
mailto:re...@google.com>> wrote:

The Beam API was written to support multiple
iterations, and there are definitely transforms that
do so. I believe that CoGroupByKey may do this as well
with the resulting iterator.

I know that the Dataflow runner is able to handles
iterators larger than available memory by paging them
in from shuffle, which still allows for reiterating.
It sounds like Spark is less flexible here?

Reuven

On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský
mailto:je...@seznam.cz>> wrote:

+dev 


Lukasz, why do you think that users expect to be
able to iterate multiple times grouped elements?
Besides that it obviously suggests the 'Iterable'?
The way that spark behaves is pretty much
analogous to how MapReduce used to work - in
certain cases it calles
repartitionAndSortWithinPartitions and then does
mapPartition, which accepts Iterator - that is
because internally it merge sorts pre sorted
segments. This approach enables to GroupByKey data
sets that are too big to fit into memory (per key).

If multiple iterations should be expected by
users, we probably should:

 a) include that in @ValidatesRunner tests

 b) store values in memory on spark, which will
break for certain pipelines

Because of (b) I think that it would be much
better to remove this "expectation" and clearly
document that the Iterable is not supposed to be
iterated multiple times.

Jan

On 9/27/19 9:27 AM, Jan Lukavský wrote:


I pretty much think so, because that is how Spark
works. The Iterable inside is really an Iterator,
which cannot be iterated multiple 

Re: Multiple iterations after GroupByKey with SparkRunner

2019-09-27 Thread Reuven Lax
This should be an element in the compatibility matrix as well.

On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles  wrote:

> I am pretty surprised that we do not have a @Category(ValidatesRunner)
> test in GroupByKeyTest that iterates multiple times. That is a major
> oversight. We should have this test, and it can be disabled by the
> SparkRunner's configuration.
>
> Kenn
>
> On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax  wrote:
>
>> The Dataflow version does not spill to disk. However Spark's design might
>> require spilling to disk if you want that to be implemented properly.
>>
>> On Fri, Sep 27, 2019 at 9:08 AM David Morávek  wrote:
>>
>>> Hi,
>>>
>>> Spark's GBK is currently implemented using `sortBy(key and
>>> value).mapPartition(...)` for non-merging windowing in order to support
>>> large keys and large scale shuffles. Merging windowing is implemented using
>>> standard GBK (underlying spark impl. uses ListCombiner + Hash Grouping),
>>> which is by design unable to support large keys.
>>>
>>> As Jan noted, problem with mapPartition is, that its UDF receives an
>>> Iterator. Only option here is to wrap this iterator to one that spills to
>>> disk once an internal buffer is exceeded (the approach suggested by
>>> Reuven). This unfortunately comes with a cost in some cases. The best
>>> approach would be to somehow determine, that user wants multiple iterations
>>> and than wrap it in "re-iterator" if necessary. Does anyone have any ideas
>>> how to approach this?
>>>
>>> D.
>>>
>>> On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax  wrote:
>>>
 The Beam API was written to support multiple iterations, and there are
 definitely transforms that do so. I believe that CoGroupByKey may do this
 as well with the resulting iterator.

 I know that the Dataflow runner is able to handles iterators larger
 than available memory by paging them in from shuffle, which still allows
 for reiterating. It sounds like Spark is less flexible here?

 Reuven

 On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský  wrote:

> +dev  
>
> Lukasz, why do you think that users expect to be able to iterate
> multiple times grouped elements? Besides that it obviously suggests the
> 'Iterable'? The way that spark behaves is pretty much analogous to how
> MapReduce used to work - in certain cases it calles
> repartitionAndSortWithinPartitions and then does mapPartition, which
> accepts Iterator - that is because internally it merge sorts pre sorted
> segments. This approach enables to GroupByKey data sets that are too big 
> to
> fit into memory (per key).
>
> If multiple iterations should be expected by users, we probably should:
>
>  a) include that in @ValidatesRunner tests
>
>  b) store values in memory on spark, which will break for certain
> pipelines
>
> Because of (b) I think that it would be much better to remove this
> "expectation" and clearly document that the Iterable is not supposed to be
> iterated multiple times.
>
> Jan
> On 9/27/19 9:27 AM, Jan Lukavský wrote:
>
> I pretty much think so, because that is how Spark works. The Iterable
> inside is really an Iterator, which cannot be iterated multiple times.
>
> Jan
> On 9/27/19 2:00 AM, Lukasz Cwik wrote:
>
> Jan, in Beam users expect to be able to iterate the GBK output
> multiple times even from within the same ParDo.
> Is this something that Beam on Spark Runner never supported?
>
> On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský  wrote:
>
>> Hi Gershi,
>>
>> could you please outline the pipeline you are trying to execute?
>> Basically, you cannot iterate the Iterable multiple times in single 
>> ParDo.
>> It should be possible, though, to apply multiple ParDos to output from
>> GroupByKey.
>>
>> Jan
>> On 9/26/19 3:32 PM, Gershi, Noam wrote:
>>
>> Hi,
>>
>>
>>
>> I want to iterate multiple times on the Iterable (the output of
>> GroupByKey transformation)
>>
>> When my Runner is SparkRunner, I get an exception:
>>
>>
>>
>> Caused by: java.lang.IllegalStateException: ValueIterator can't be
>> iterated more than once,otherwise there could be data lost
>>
>> at
>> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
>>
>> at java.lang.Iterable.spliterator(Iterable.java:101)
>>
>>
>>
>>
>>
>> I understood I can branch the pipeline after GroupByKey into multiple
>> transformation and iterate in each of them once on the Iterable.
>>
>>
>>
>> Is there a better way for that?
>>
>>
>>
>>
>>
>> [image: citi_logo_mail][image: citi_logo_mail]*Noam Gershi*
>>
>> Software Developer

Re: Multiple iterations after GroupByKey with SparkRunner

2019-09-27 Thread Kenneth Knowles
I am pretty surprised that we do not have a @Category(ValidatesRunner) test
in GroupByKeyTest that iterates multiple times. That is a major oversight.
We should have this test, and it can be disabled by the SparkRunner's
configuration.

Kenn

On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax  wrote:

> The Dataflow version does not spill to disk. However Spark's design might
> require spilling to disk if you want that to be implemented properly.
>
> On Fri, Sep 27, 2019 at 9:08 AM David Morávek  wrote:
>
>> Hi,
>>
>> Spark's GBK is currently implemented using `sortBy(key and
>> value).mapPartition(...)` for non-merging windowing in order to support
>> large keys and large scale shuffles. Merging windowing is implemented using
>> standard GBK (underlying spark impl. uses ListCombiner + Hash Grouping),
>> which is by design unable to support large keys.
>>
>> As Jan noted, problem with mapPartition is, that its UDF receives an
>> Iterator. Only option here is to wrap this iterator to one that spills to
>> disk once an internal buffer is exceeded (the approach suggested by
>> Reuven). This unfortunately comes with a cost in some cases. The best
>> approach would be to somehow determine, that user wants multiple iterations
>> and than wrap it in "re-iterator" if necessary. Does anyone have any ideas
>> how to approach this?
>>
>> D.
>>
>> On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax  wrote:
>>
>>> The Beam API was written to support multiple iterations, and there are
>>> definitely transforms that do so. I believe that CoGroupByKey may do this
>>> as well with the resulting iterator.
>>>
>>> I know that the Dataflow runner is able to handles iterators larger than
>>> available memory by paging them in from shuffle, which still allows for
>>> reiterating. It sounds like Spark is less flexible here?
>>>
>>> Reuven
>>>
>>> On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský  wrote:
>>>
 +dev  

 Lukasz, why do you think that users expect to be able to iterate
 multiple times grouped elements? Besides that it obviously suggests the
 'Iterable'? The way that spark behaves is pretty much analogous to how
 MapReduce used to work - in certain cases it calles
 repartitionAndSortWithinPartitions and then does mapPartition, which
 accepts Iterator - that is because internally it merge sorts pre sorted
 segments. This approach enables to GroupByKey data sets that are too big to
 fit into memory (per key).

 If multiple iterations should be expected by users, we probably should:

  a) include that in @ValidatesRunner tests

  b) store values in memory on spark, which will break for certain
 pipelines

 Because of (b) I think that it would be much better to remove this
 "expectation" and clearly document that the Iterable is not supposed to be
 iterated multiple times.

 Jan
 On 9/27/19 9:27 AM, Jan Lukavský wrote:

 I pretty much think so, because that is how Spark works. The Iterable
 inside is really an Iterator, which cannot be iterated multiple times.

 Jan
 On 9/27/19 2:00 AM, Lukasz Cwik wrote:

 Jan, in Beam users expect to be able to iterate the GBK output multiple
 times even from within the same ParDo.
 Is this something that Beam on Spark Runner never supported?

 On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský  wrote:

> Hi Gershi,
>
> could you please outline the pipeline you are trying to execute?
> Basically, you cannot iterate the Iterable multiple times in single ParDo.
> It should be possible, though, to apply multiple ParDos to output from
> GroupByKey.
>
> Jan
> On 9/26/19 3:32 PM, Gershi, Noam wrote:
>
> Hi,
>
>
>
> I want to iterate multiple times on the Iterable (the output of
> GroupByKey transformation)
>
> When my Runner is SparkRunner, I get an exception:
>
>
>
> Caused by: java.lang.IllegalStateException: ValueIterator can't be
> iterated more than once,otherwise there could be data lost
>
> at
> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
>
> at java.lang.Iterable.spliterator(Iterable.java:101)
>
>
>
>
>
> I understood I can branch the pipeline after GroupByKey into multiple
> transformation and iterate in each of them once on the Iterable.
>
>
>
> Is there a better way for that?
>
>
>
>
>
> [image: citi_logo_mail][image: citi_logo_mail]*Noam Gershi*
>
> Software Developer
>
> *T*: +972 (3) 7405718 <+972%203-740-5718>
>
> [image: Mail_signature_blue]
>
>
>
>


Re: Multiple iterations after GroupByKey with SparkRunner

2019-09-27 Thread Reuven Lax
The Dataflow version does not spill to disk. However Spark's design might
require spilling to disk if you want that to be implemented properly.

On Fri, Sep 27, 2019 at 9:08 AM David Morávek  wrote:

> Hi,
>
> Spark's GBK is currently implemented using `sortBy(key and
> value).mapPartition(...)` for non-merging windowing in order to support
> large keys and large scale shuffles. Merging windowing is implemented using
> standard GBK (underlying spark impl. uses ListCombiner + Hash Grouping),
> which is by design unable to support large keys.
>
> As Jan noted, problem with mapPartition is, that its UDF receives an
> Iterator. Only option here is to wrap this iterator to one that spills to
> disk once an internal buffer is exceeded (the approach suggested by
> Reuven). This unfortunately comes with a cost in some cases. The best
> approach would be to somehow determine, that user wants multiple iterations
> and than wrap it in "re-iterator" if necessary. Does anyone have any ideas
> how to approach this?
>
> D.
>
> On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax  wrote:
>
>> The Beam API was written to support multiple iterations, and there are
>> definitely transforms that do so. I believe that CoGroupByKey may do this
>> as well with the resulting iterator.
>>
>> I know that the Dataflow runner is able to handles iterators larger than
>> available memory by paging them in from shuffle, which still allows for
>> reiterating. It sounds like Spark is less flexible here?
>>
>> Reuven
>>
>> On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský  wrote:
>>
>>> +dev  
>>>
>>> Lukasz, why do you think that users expect to be able to iterate
>>> multiple times grouped elements? Besides that it obviously suggests the
>>> 'Iterable'? The way that spark behaves is pretty much analogous to how
>>> MapReduce used to work - in certain cases it calles
>>> repartitionAndSortWithinPartitions and then does mapPartition, which
>>> accepts Iterator - that is because internally it merge sorts pre sorted
>>> segments. This approach enables to GroupByKey data sets that are too big to
>>> fit into memory (per key).
>>>
>>> If multiple iterations should be expected by users, we probably should:
>>>
>>>  a) include that in @ValidatesRunner tests
>>>
>>>  b) store values in memory on spark, which will break for certain
>>> pipelines
>>>
>>> Because of (b) I think that it would be much better to remove this
>>> "expectation" and clearly document that the Iterable is not supposed to be
>>> iterated multiple times.
>>>
>>> Jan
>>> On 9/27/19 9:27 AM, Jan Lukavský wrote:
>>>
>>> I pretty much think so, because that is how Spark works. The Iterable
>>> inside is really an Iterator, which cannot be iterated multiple times.
>>>
>>> Jan
>>> On 9/27/19 2:00 AM, Lukasz Cwik wrote:
>>>
>>> Jan, in Beam users expect to be able to iterate the GBK output multiple
>>> times even from within the same ParDo.
>>> Is this something that Beam on Spark Runner never supported?
>>>
>>> On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský  wrote:
>>>
 Hi Gershi,

 could you please outline the pipeline you are trying to execute?
 Basically, you cannot iterate the Iterable multiple times in single ParDo.
 It should be possible, though, to apply multiple ParDos to output from
 GroupByKey.

 Jan
 On 9/26/19 3:32 PM, Gershi, Noam wrote:

 Hi,



 I want to iterate multiple times on the Iterable (the output of
 GroupByKey transformation)

 When my Runner is SparkRunner, I get an exception:



 Caused by: java.lang.IllegalStateException: ValueIterator can't be
 iterated more than once,otherwise there could be data lost

 at
 org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)

 at java.lang.Iterable.spliterator(Iterable.java:101)





 I understood I can branch the pipeline after GroupByKey into multiple
 transformation and iterate in each of them once on the Iterable.



 Is there a better way for that?





 [image: citi_logo_mail][image: citi_logo_mail]*Noam Gershi*

 Software Developer

 *T*: +972 (3) 7405718 <+972%203-740-5718>

 [image: Mail_signature_blue]






Re: Multiple iterations after GroupByKey with SparkRunner

2019-09-27 Thread David Morávek
Hi,

Spark's GBK is currently implemented using `sortBy(key and
value).mapPartition(...)` for non-merging windowing in order to support
large keys and large scale shuffles. Merging windowing is implemented using
standard GBK (underlying spark impl. uses ListCombiner + Hash Grouping),
which is by design unable to support large keys.

As Jan noted, problem with mapPartition is, that its UDF receives an
Iterator. Only option here is to wrap this iterator to one that spills to
disk once an internal buffer is exceeded (the approach suggested by
Reuven). This unfortunately comes with a cost in some cases. The best
approach would be to somehow determine, that user wants multiple iterations
and than wrap it in "re-iterator" if necessary. Does anyone have any ideas
how to approach this?

D.

On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax  wrote:

> The Beam API was written to support multiple iterations, and there are
> definitely transforms that do so. I believe that CoGroupByKey may do this
> as well with the resulting iterator.
>
> I know that the Dataflow runner is able to handles iterators larger than
> available memory by paging them in from shuffle, which still allows for
> reiterating. It sounds like Spark is less flexible here?
>
> Reuven
>
> On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský  wrote:
>
>> +dev  
>>
>> Lukasz, why do you think that users expect to be able to iterate multiple
>> times grouped elements? Besides that it obviously suggests the 'Iterable'?
>> The way that spark behaves is pretty much analogous to how MapReduce used
>> to work - in certain cases it calles repartitionAndSortWithinPartitions and
>> then does mapPartition, which accepts Iterator - that is because internally
>> it merge sorts pre sorted segments. This approach enables to GroupByKey
>> data sets that are too big to fit into memory (per key).
>>
>> If multiple iterations should be expected by users, we probably should:
>>
>>  a) include that in @ValidatesRunner tests
>>
>>  b) store values in memory on spark, which will break for certain
>> pipelines
>>
>> Because of (b) I think that it would be much better to remove this
>> "expectation" and clearly document that the Iterable is not supposed to be
>> iterated multiple times.
>>
>> Jan
>> On 9/27/19 9:27 AM, Jan Lukavský wrote:
>>
>> I pretty much think so, because that is how Spark works. The Iterable
>> inside is really an Iterator, which cannot be iterated multiple times.
>>
>> Jan
>> On 9/27/19 2:00 AM, Lukasz Cwik wrote:
>>
>> Jan, in Beam users expect to be able to iterate the GBK output multiple
>> times even from within the same ParDo.
>> Is this something that Beam on Spark Runner never supported?
>>
>> On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský  wrote:
>>
>>> Hi Gershi,
>>>
>>> could you please outline the pipeline you are trying to execute?
>>> Basically, you cannot iterate the Iterable multiple times in single ParDo.
>>> It should be possible, though, to apply multiple ParDos to output from
>>> GroupByKey.
>>>
>>> Jan
>>> On 9/26/19 3:32 PM, Gershi, Noam wrote:
>>>
>>> Hi,
>>>
>>>
>>>
>>> I want to iterate multiple times on the Iterable (the output of
>>> GroupByKey transformation)
>>>
>>> When my Runner is SparkRunner, I get an exception:
>>>
>>>
>>>
>>> Caused by: java.lang.IllegalStateException: ValueIterator can't be
>>> iterated more than once,otherwise there could be data lost
>>>
>>> at
>>> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
>>>
>>> at java.lang.Iterable.spliterator(Iterable.java:101)
>>>
>>>
>>>
>>>
>>>
>>> I understood I can branch the pipeline after GroupByKey into multiple
>>> transformation and iterate in each of them once on the Iterable.
>>>
>>>
>>>
>>> Is there a better way for that?
>>>
>>>
>>>
>>>
>>>
>>> [image: citi_logo_mail][image: citi_logo_mail]*Noam Gershi*
>>>
>>> Software Developer
>>>
>>> *T*: +972 (3) 7405718 <+972%203-740-5718>
>>>
>>> [image: Mail_signature_blue]
>>>
>>>
>>>
>>>


Re: Multiple iterations after GroupByKey with SparkRunner

2019-09-27 Thread Reuven Lax
The Beam API was written to support multiple iterations, and there are
definitely transforms that do so. I believe that CoGroupByKey may do this
as well with the resulting iterator.

I know that the Dataflow runner is able to handles iterators larger than
available memory by paging them in from shuffle, which still allows for
reiterating. It sounds like Spark is less flexible here?

Reuven

On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský  wrote:

> +dev  
>
> Lukasz, why do you think that users expect to be able to iterate multiple
> times grouped elements? Besides that it obviously suggests the 'Iterable'?
> The way that spark behaves is pretty much analogous to how MapReduce used
> to work - in certain cases it calles repartitionAndSortWithinPartitions and
> then does mapPartition, which accepts Iterator - that is because internally
> it merge sorts pre sorted segments. This approach enables to GroupByKey
> data sets that are too big to fit into memory (per key).
>
> If multiple iterations should be expected by users, we probably should:
>
>  a) include that in @ValidatesRunner tests
>
>  b) store values in memory on spark, which will break for certain pipelines
>
> Because of (b) I think that it would be much better to remove this
> "expectation" and clearly document that the Iterable is not supposed to be
> iterated multiple times.
>
> Jan
> On 9/27/19 9:27 AM, Jan Lukavský wrote:
>
> I pretty much think so, because that is how Spark works. The Iterable
> inside is really an Iterator, which cannot be iterated multiple times.
>
> Jan
> On 9/27/19 2:00 AM, Lukasz Cwik wrote:
>
> Jan, in Beam users expect to be able to iterate the GBK output multiple
> times even from within the same ParDo.
> Is this something that Beam on Spark Runner never supported?
>
> On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský  wrote:
>
>> Hi Gershi,
>>
>> could you please outline the pipeline you are trying to execute?
>> Basically, you cannot iterate the Iterable multiple times in single ParDo.
>> It should be possible, though, to apply multiple ParDos to output from
>> GroupByKey.
>>
>> Jan
>> On 9/26/19 3:32 PM, Gershi, Noam wrote:
>>
>> Hi,
>>
>>
>>
>> I want to iterate multiple times on the Iterable (the output of
>> GroupByKey transformation)
>>
>> When my Runner is SparkRunner, I get an exception:
>>
>>
>>
>> Caused by: java.lang.IllegalStateException: ValueIterator can't be
>> iterated more than once,otherwise there could be data lost
>>
>> at
>> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
>>
>> at java.lang.Iterable.spliterator(Iterable.java:101)
>>
>>
>>
>>
>>
>> I understood I can branch the pipeline after GroupByKey into multiple
>> transformation and iterate in each of them once on the Iterable.
>>
>>
>>
>> Is there a better way for that?
>>
>>
>>
>>
>>
>> [image: citi_logo_mail][image: citi_logo_mail]*Noam Gershi*
>>
>> Software Developer
>>
>> *T*: +972 (3) 7405718 <+972%203-740-5718>
>>
>> [image: Mail_signature_blue]
>>
>>
>>
>>


Re: Multiple iterations after GroupByKey with SparkRunner

2019-09-27 Thread Shannon Duncan
I see two main options here.

Create an in memory Iterable as you do your first iteration. (poor
implementation imo)

Separate your iterations as separate DoFn and call them separately with the
PCollection output from Shuffle. There are many different paths but finding
the most parallel way is probably the best.

- Shannon

On Fri, Sep 27, 2019 at 5:04 AM Jan Lukavský  wrote:

> +dev  
>
> Lukasz, why do you think that users expect to be able to iterate multiple
> times grouped elements? Besides that it obviously suggests the 'Iterable'?
> The way that spark behaves is pretty much analogous to how MapReduce used
> to work - in certain cases it calles repartitionAndSortWithinPartitions and
> then does mapPartition, which accepts Iterator - that is because internally
> it merge sorts pre sorted segments. This approach enables to GroupByKey
> data sets that are too big to fit into memory (per key).
>
> If multiple iterations should be expected by users, we probably should:
>
>  a) include that in @ValidatesRunner tests
>
>  b) store values in memory on spark, which will break for certain pipelines
>
> Because of (b) I think that it would be much better to remove this
> "expectation" and clearly document that the Iterable is not supposed to be
> iterated multiple times.
>
> Jan
> On 9/27/19 9:27 AM, Jan Lukavský wrote:
>
> I pretty much think so, because that is how Spark works. The Iterable
> inside is really an Iterator, which cannot be iterated multiple times.
>
> Jan
> On 9/27/19 2:00 AM, Lukasz Cwik wrote:
>
> Jan, in Beam users expect to be able to iterate the GBK output multiple
> times even from within the same ParDo.
> Is this something that Beam on Spark Runner never supported?
>
> On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský  wrote:
>
>> Hi Gershi,
>>
>> could you please outline the pipeline you are trying to execute?
>> Basically, you cannot iterate the Iterable multiple times in single ParDo.
>> It should be possible, though, to apply multiple ParDos to output from
>> GroupByKey.
>>
>> Jan
>> On 9/26/19 3:32 PM, Gershi, Noam wrote:
>>
>> Hi,
>>
>>
>>
>> I want to iterate multiple times on the Iterable (the output of
>> GroupByKey transformation)
>>
>> When my Runner is SparkRunner, I get an exception:
>>
>>
>>
>> Caused by: java.lang.IllegalStateException: ValueIterator can't be
>> iterated more than once,otherwise there could be data lost
>>
>> at
>> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
>>
>> at java.lang.Iterable.spliterator(Iterable.java:101)
>>
>>
>>
>>
>>
>> I understood I can branch the pipeline after GroupByKey into multiple
>> transformation and iterate in each of them once on the Iterable.
>>
>>
>>
>> Is there a better way for that?
>>
>>
>>
>>
>>
>> [image: citi_logo_mail][image: citi_logo_mail]*Noam Gershi*
>>
>> Software Developer
>>
>> *T*: +972 (3) 7405718 <+972%203-740-5718>
>>
>> [image: Mail_signature_blue]
>>
>>
>>
>>


Re: Multiple iterations after GroupByKey with SparkRunner

2019-09-27 Thread Jan Lukavský

+dev 

Lukasz, why do you think that users expect to be able to iterate 
multiple times grouped elements? Besides that it obviously suggests the 
'Iterable'? The way that spark behaves is pretty much analogous to how 
MapReduce used to work - in certain cases it calles 
repartitionAndSortWithinPartitions and then does mapPartition, which 
accepts Iterator - that is because internally it merge sorts pre sorted 
segments. This approach enables to GroupByKey data sets that are too big 
to fit into memory (per key).


If multiple iterations should be expected by users, we probably should:

 a) include that in @ValidatesRunner tests

 b) store values in memory on spark, which will break for certain pipelines

Because of (b) I think that it would be much better to remove this 
"expectation" and clearly document that the Iterable is not supposed to 
be iterated multiple times.


Jan

On 9/27/19 9:27 AM, Jan Lukavský wrote:


I pretty much think so, because that is how Spark works. The Iterable 
inside is really an Iterator, which cannot be iterated multiple times.


Jan

On 9/27/19 2:00 AM, Lukasz Cwik wrote:
Jan, in Beam users expect to be able to iterate the GBK output 
multiple times even from within the same ParDo.

Is this something that Beam on Spark Runner never supported?

On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský > wrote:


Hi Gershi,

could you please outline the pipeline you are trying to execute?
Basically, you cannot iterate the Iterable multiple times in
single ParDo. It should be possible, though, to apply multiple
ParDos to output from GroupByKey.

Jan

On 9/26/19 3:32 PM, Gershi, Noam wrote:


Hi,

I want to iterate multiple times on the Iterable (the output
of GroupByKey transformation)

When my Runner is SparkRunner, I get an exception:

Caused by: java.lang.IllegalStateException: ValueIterator can't
be iterated more than once,otherwise there could be data lost

    at

org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)

    at java.lang.Iterable.spliterator(Iterable.java:101)

I understood I can branch the pipeline after GroupByKey into
multiple transformation and iterate in each of them once on the
Iterable.

Is there a better way for that?

citi_logo_mailciti_logo_mail*Noam Gershi*

Software Developer

*T*:+972 (3) 7405718 

Mail_signature_blue