Hi,

follow-up discussion on dev@ mailing list more or less suggests that the current behavior of Spark runner is a bug [1]. It will probably be fixed (a consensus about how to fix that has not yet been made), in the meantime you would probably have to use some workaround. Can you describe the actual logic of your pipeline? Maybe we can suggest a way to compute what you need without the necessity of reiterating the grouped elements.

Jan

[1] https://lists.apache.org/thread.html/5e71c98a32bcbc536f55f91f3e81e90ab7384c6ea56bdb30a92fcb31@%3Cdev.beam.apache.org%3E

On 9/29/19 1:54 PM, Gershi, Noam wrote:

Hi,

Thanx for the reply.

So – re-iteration on grouped elements is a runner-dependent. Flink & DataFlow allows it, while  Spark isn’t.

Since we investigating  here the runners also, Does anyone have a list which runner allow\not-allow re-iteration?

Noam

*From:*[apache.org] Kenneth Knowles <[email protected]>
*Sent:* Friday, September 27, 2019 7:26 PM
*To:* dev
*Cc:* user
*Subject:* Re: Multiple iterations after GroupByKey with SparkRunner

I am pretty surprised that we do not have a @Category(ValidatesRunner) test in GroupByKeyTest that iterates multiple times. That is a major oversight. We should have this test, and it can be disabled by the SparkRunner's configuration.

Kenn

On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax <[email protected] <mailto:[email protected]>> wrote:

    The Dataflow version does not spill to disk. However Spark's
    design might require spilling to disk if you want that to be
    implemented properly.

    On Fri, Sep 27, 2019 at 9:08 AM David Morávek <[email protected]
    <mailto:[email protected]>> wrote:

        Hi,

        Spark's GBK is currently implemented using `sortBy(key and
        value).mapPartition(...)` for non-merging windowing in order
        to support large keys and large scale shuffles. Merging
        windowing is implemented using standard GBK (underlying spark
        impl. uses ListCombiner + Hash Grouping), which is by design
        unable to support large keys.

        As Jan noted, problem with mapPartition is, that its UDF
        receives an Iterator. Only option here is to wrap this
        iterator to one that spills to disk once an internal buffer is
        exceeded (the approach suggested by Reuven). This
        unfortunately comes with a cost in some cases. The best
        approach would be to somehow determine, that user wants
        multiple iterations and than wrap it in "re-iterator" if
        necessary. Does anyone have any ideas how to approach this?

        D.

        On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax <[email protected]
        <mailto:[email protected]>> wrote:

            The Beam API was written to support multiple iterations,
            and there are definitely transforms that do so. I believe
            that CoGroupByKey may do this as well with the resulting
            iterator.

            I know that the Dataflow runner is able to handles
            iterators larger than available memory by paging them in
            from shuffle, which still allows for reiterating. It
            sounds like Spark is less flexible here?

            Reuven

            On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský
            <[email protected] <mailto:[email protected]>> wrote:

                +dev <[email protected]> <mailto:[email protected]>

                Lukasz, why do you think that users expect to be able
                to iterate multiple times grouped elements? Besides
                that it obviously suggests the 'Iterable'? The way
                that spark behaves is pretty much analogous to how
                MapReduce used to work - in certain cases it calles
                repartitionAndSortWithinPartitions and then does
                mapPartition, which accepts Iterator - that is because
                internally it merge sorts pre sorted segments. This
                approach enables to GroupByKey data sets that are too
                big to fit into memory (per key).

                If multiple iterations should be expected by users, we
                probably should:

                 a) include that in @ValidatesRunner tests

                 b) store values in memory on spark, which will break
                for certain pipelines

                Because of (b) I think that it would be much better to
                remove this "expectation" and clearly document that
                the Iterable is not supposed to be iterated multiple
                times.

                Jan

                On 9/27/19 9:27 AM, Jan Lukavský wrote:

                    I pretty much think so, because that is how Spark
                    works. The Iterable inside is really an Iterator,
                    which cannot be iterated multiple times.

                    Jan

                    On 9/27/19 2:00 AM, Lukasz Cwik wrote:

                        Jan, in Beam users expect to be able to
                        iterate the GBK output multiple times even
                        from within the same ParDo.

                        Is this something that Beam on Spark Runner
                        never supported?

                        On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský
                        <[email protected] <mailto:[email protected]>> wrote:

                            Hi Gershi,

                            could you please outline the pipeline you
                            are trying to execute? Basically, you
                            cannot iterate the Iterable multiple times
                            in single ParDo. It should be possible,
                            though, to apply multiple ParDos to output
                            from GroupByKey.

                            Jan

                            On 9/26/19 3:32 PM, Gershi, Noam wrote:

                                Hi,

                                I want to iterate multiple times on
                                the Iterable<V> (the output of
                                GroupByKey transformation)

                                When my Runner is SparkRunner, I get
                                an exception:

                                Caused by:
                                java.lang.IllegalStateException:
                                ValueIterator can't be iterated more
                                than once,otherwise there could be
                                data lost

                                at
                                
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java
                                
<https://urldefense.proofpoint.com/v2/url?u=http-3A__GroupNonMergingWindowsFunctions.java&d=DwQFaQ&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=fQWOlVJ0K7fMH1V-V2wbcwjQBpBYjRv-8EtoFNYcAZU&s=Ow9AGQZRtbMkhnvxbK4yzNOsKXIBsbO9l-MBvYh_PDs&e=>:221)

                                at
                                java.lang.Iterable.spliterator(Iterable.java
                                
<https://urldefense.proofpoint.com/v2/url?u=http-3A__Iterable.java&d=DwQFaQ&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=fQWOlVJ0K7fMH1V-V2wbcwjQBpBYjRv-8EtoFNYcAZU&s=p1jPFPtbIpWO-mRa5ACZ-OQD97zpImY-FLSpBnvsqSM&e=>:101)

                                I understood I can branch the pipeline
                                after GroupByKey into multiple
                                transformation and iterate in each of
                                them once on the Iterable<V>.

                                Is there a better way for that?

                                citi_logo_mailciti_logo_mail*Noam Gershi*

                                Software Developer

                                *T*: +972 (3) 7405718
                                <tel:+972%203-740-5718>

                                Mail_signature_blue

Reply via email to