Thanx!

I changed it to split (attached), and it works.

Can you explain how to implement it with merging-windows (just for knowledge) ?


From: [seznam.cz] Jan Lukavský <[email protected]>
Sent: Thursday, October 24, 2019 2:19 PM
To: [email protected]
Subject: Re: Iterating filtered Grouped elements on SparkRunner


Hi Noam,

we are working towards fixing this bug so that your code would work, but that 
will not be sooner than version 2.18.0 (and I cannot promise even that :-)). In 
the mean time, you have several options:

  a) use merging windowing - that will unfortunately mean some performance 
penalty and is more a dirty hack than anything else, but it might work

  b) split the logic into two parts - one part to calculate cardinality of the 
group (you can use Count.perKey [1]) and then join this on keys of the GBK 
result (e.g. [2] or [3]), filter there using the calculated cardinality and 
calculate your result

The option (b) should be actually more effective if you have large groups, 
because calculation of the cardinality can be parallelised.

Hope this helps, please feel free to ask any more questions.

Jan

[1] 
https://beam.apache.org/releases/javadoc/2.16.0/org/apache/beam/sdk/transforms/Count.html<https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_releases_javadoc_2.16.0_org_apache_beam_sdk_transforms_Count.html&d=DwMD-g&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=7_fGoNrUoepqe81jwQN20j2TOEbJEfA-EHXiPa2B_wM&s=GMSufF4RsWXfgZzpKoxSx9y4xJT_yRXa8mLeT7pUi5Q&e=>

[2] 
https://beam.apache.org/documentation/sdks/java/euphoria/#join<https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_documentation_sdks_java_euphoria_-23join&d=DwMD-g&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=7_fGoNrUoepqe81jwQN20j2TOEbJEfA-EHXiPa2B_wM&s=asMS8QXczmLUNR6oJ6HLHOnAgVX4ecA0U9WSsRlTTIk&e=>

[3] 
https://beam.apache.org/documentation/sdks/java-extensions/#join-library<https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_documentation_sdks_java-2Dextensions_-23join-2Dlibrary&d=DwMD-g&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=7_fGoNrUoepqe81jwQN20j2TOEbJEfA-EHXiPa2B_wM&s=RiOkpr6NOhfYIRAEvnRZX1K63VyYexdAHod9BgYiUYI&e=>


On 10/24/19 11:46 AM, Gershi, Noam wrote:
Hi,

I would like to:

1.       Group elements

2.       Then filter-out some groups

3.       Then iterate and calculate on filtered-in grouped

Under Spark execution environments, I get an exception:

Caused by: java.lang.IllegalStateException: ValueIterator can't be iterated 
more than once,otherwise there could be data lost
                at 
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java<https://urldefense.proofpoint.com/v2/url?u=http-3A__GroupNonMergingWindowsFunctions.java&d=DwQD-g&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=7_fGoNrUoepqe81jwQN20j2TOEbJEfA-EHXiPa2B_wM&s=pVZKiNJxo2mW6dISx2QxGaUs-K0dBnjv3be2MqDvgyo&e=>:221)
                at 
java.lang.Iterable.spliterator(Iterable.java<https://urldefense.proofpoint.com/v2/url?u=http-3A__Iterable.java&d=DwQD-g&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=7_fGoNrUoepqe81jwQN20j2TOEbJEfA-EHXiPa2B_wM&s=_8I1NMTsVoB5XYEIGnFpn6j_UnmFxbU7dBIj4iWnfcI&e=>:101)
                at 
com.company.Main$2.processElement(Main.java<https://urldefense.proofpoint.com/v2/url?u=http-3A__Main.java&d=DwQD-g&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=7_fGoNrUoepqe81jwQN20j2TOEbJEfA-EHXiPa2B_wM&s=Q6BCePmzjtGMRzcj9gkaqd_u0T9k5ihHXnxx3lmSs8c&e=>:65)

Code attached


[citi_logo_mail][citi_logo_mail]Noam Gershi
Software Developer
T: +972 (3) 7405718
[Mail_signature_blue]

Attachment: MainWithSplit.java
Description: MainWithSplit.java

Reply via email to