Hi,
follow-up discussion on dev@ mailing list more or less suggests that the
current behavior of Spark runner is a bug [1]. It will probably be fixed
(a consensus about how to fix that has not yet been made), in the
meantime you would probably have to use some workaround. Can you
describe the actual logic of your pipeline? Maybe we can suggest a way
to compute what you need without the necessity of reiterating the
grouped elements.
Jan
[1]
https://lists.apache.org/thread.html/5e71c98a32bcbc536f55f91f3e81e90ab7384c6ea56bdb30a92fcb31@%3Cdev.beam.apache.org%3E
On 9/29/19 1:54 PM, Gershi, Noam wrote:
Hi,
Thanx for the reply.
So – re-iteration on grouped elements is a runner-dependent. Flink &
DataFlow allows it, while Spark isn’t.
Since we investigating here the runners also, Does anyone have a list
which runner allow\not-allow re-iteration?
Noam
*From:*[apache.org] Kenneth Knowles <[email protected]>
*Sent:* Friday, September 27, 2019 7:26 PM
*To:* dev
*Cc:* user
*Subject:* Re: Multiple iterations after GroupByKey with SparkRunner
I am pretty surprised that we do not have a @Category(ValidatesRunner)
test in GroupByKeyTest that iterates multiple times. That is a major
oversight. We should have this test, and it can be disabled by the
SparkRunner's configuration.
Kenn
On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax <[email protected]
<mailto:[email protected]>> wrote:
The Dataflow version does not spill to disk. However Spark's
design might require spilling to disk if you want that to be
implemented properly.
On Fri, Sep 27, 2019 at 9:08 AM David Morávek <[email protected]
<mailto:[email protected]>> wrote:
Hi,
Spark's GBK is currently implemented using `sortBy(key and
value).mapPartition(...)` for non-merging windowing in order
to support large keys and large scale shuffles. Merging
windowing is implemented using standard GBK (underlying spark
impl. uses ListCombiner + Hash Grouping), which is by design
unable to support large keys.
As Jan noted, problem with mapPartition is, that its UDF
receives an Iterator. Only option here is to wrap this
iterator to one that spills to disk once an internal buffer is
exceeded (the approach suggested by Reuven). This
unfortunately comes with a cost in some cases. The best
approach would be to somehow determine, that user wants
multiple iterations and than wrap it in "re-iterator" if
necessary. Does anyone have any ideas how to approach this?
D.
On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax <[email protected]
<mailto:[email protected]>> wrote:
The Beam API was written to support multiple iterations,
and there are definitely transforms that do so. I believe
that CoGroupByKey may do this as well with the resulting
iterator.
I know that the Dataflow runner is able to handles
iterators larger than available memory by paging them in
from shuffle, which still allows for reiterating. It
sounds like Spark is less flexible here?
Reuven
On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
+dev <[email protected]> <mailto:[email protected]>
Lukasz, why do you think that users expect to be able
to iterate multiple times grouped elements? Besides
that it obviously suggests the 'Iterable'? The way
that spark behaves is pretty much analogous to how
MapReduce used to work - in certain cases it calles
repartitionAndSortWithinPartitions and then does
mapPartition, which accepts Iterator - that is because
internally it merge sorts pre sorted segments. This
approach enables to GroupByKey data sets that are too
big to fit into memory (per key).
If multiple iterations should be expected by users, we
probably should:
a) include that in @ValidatesRunner tests
b) store values in memory on spark, which will break
for certain pipelines
Because of (b) I think that it would be much better to
remove this "expectation" and clearly document that
the Iterable is not supposed to be iterated multiple
times.
Jan
On 9/27/19 9:27 AM, Jan Lukavský wrote:
I pretty much think so, because that is how Spark
works. The Iterable inside is really an Iterator,
which cannot be iterated multiple times.
Jan
On 9/27/19 2:00 AM, Lukasz Cwik wrote:
Jan, in Beam users expect to be able to
iterate the GBK output multiple times even
from within the same ParDo.
Is this something that Beam on Spark Runner
never supported?
On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
Hi Gershi,
could you please outline the pipeline you
are trying to execute? Basically, you
cannot iterate the Iterable multiple times
in single ParDo. It should be possible,
though, to apply multiple ParDos to output
from GroupByKey.
Jan
On 9/26/19 3:32 PM, Gershi, Noam wrote:
Hi,
I want to iterate multiple times on
the Iterable<V> (the output of
GroupByKey transformation)
When my Runner is SparkRunner, I get
an exception:
Caused by:
java.lang.IllegalStateException:
ValueIterator can't be iterated more
than once,otherwise there could be
data lost
at
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java
<https://urldefense.proofpoint.com/v2/url?u=http-3A__GroupNonMergingWindowsFunctions.java&d=DwQFaQ&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=fQWOlVJ0K7fMH1V-V2wbcwjQBpBYjRv-8EtoFNYcAZU&s=Ow9AGQZRtbMkhnvxbK4yzNOsKXIBsbO9l-MBvYh_PDs&e=>:221)
at
java.lang.Iterable.spliterator(Iterable.java
<https://urldefense.proofpoint.com/v2/url?u=http-3A__Iterable.java&d=DwQFaQ&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=fQWOlVJ0K7fMH1V-V2wbcwjQBpBYjRv-8EtoFNYcAZU&s=p1jPFPtbIpWO-mRa5ACZ-OQD97zpImY-FLSpBnvsqSM&e=>:101)
I understood I can branch the pipeline
after GroupByKey into multiple
transformation and iterate in each of
them once on the Iterable<V>.
Is there a better way for that?
citi_logo_mailciti_logo_mail*Noam Gershi*
Software Developer
*T*: +972 (3) 7405718
<tel:+972%203-740-5718>
Mail_signature_blue