As I said, that is just a hack. If you use global window, then you can
achieve basically the same functionality if you first assign windows to
your element with something like
input.apply(Window.into(Sessions.withGapDuration( /* something really
huge */ ))
Because session windowing is merging windowing, it will disable the
non-merging grouping optimization which prevents you from reiterating
the GBK result, and your code should start working.
Jan
On 10/24/19 3:08 PM, Gershi, Noam wrote:
Thanx!
I changed it to split (attached), and it works.
Can you explain how to implement it with merging-windows (just for
knowledge) ?
*From:*[seznam.cz] Jan Lukavský <[email protected]>
*Sent:* Thursday, October 24, 2019 2:19 PM
*To:* [email protected]
*Subject:* Re: Iterating filtered Grouped elements on SparkRunner
Hi Noam,
we are working towards fixing this bug so that your code would work,
but that will not be sooner than version 2.18.0 (and I cannot promise
even that :-)). In the mean time, you have several options:
a) use merging windowing - that will unfortunately mean some
performance penalty and is more a dirty hack than anything else, but
it might work
b) split the logic into two parts - one part to calculate
cardinality of the group (you can use Count.perKey [1]) and then join
this on keys of the GBK result (e.g. [2] or [3]), filter there using
the calculated cardinality and calculate your result
The option (b) should be actually more effective if you have large
groups, because calculation of the cardinality can be parallelised.
Hope this helps, please feel free to ask any more questions.
Jan
[1]
https://beam.apache.org/releases/javadoc/2.16.0/org/apache/beam/sdk/transforms/Count.html
<https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_releases_javadoc_2.16.0_org_apache_beam_sdk_transforms_Count.html&d=DwMD-g&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=7_fGoNrUoepqe81jwQN20j2TOEbJEfA-EHXiPa2B_wM&s=GMSufF4RsWXfgZzpKoxSx9y4xJT_yRXa8mLeT7pUi5Q&e=>
[2] https://beam.apache.org/documentation/sdks/java/euphoria/#join
<https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_documentation_sdks_java_euphoria_-23join&d=DwMD-g&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=7_fGoNrUoepqe81jwQN20j2TOEbJEfA-EHXiPa2B_wM&s=asMS8QXczmLUNR6oJ6HLHOnAgVX4ecA0U9WSsRlTTIk&e=>
[3]
https://beam.apache.org/documentation/sdks/java-extensions/#join-library
<https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_documentation_sdks_java-2Dextensions_-23join-2Dlibrary&d=DwMD-g&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=7_fGoNrUoepqe81jwQN20j2TOEbJEfA-EHXiPa2B_wM&s=RiOkpr6NOhfYIRAEvnRZX1K63VyYexdAHod9BgYiUYI&e=>
On 10/24/19 11:46 AM, Gershi, Noam wrote:
Hi,
I would like to:
1.Group elements
2.Then filter-out some groups
3.Then iterate and calculate on filtered-in grouped
Under Spark execution environments, I get an exception:
Caused by: java.lang.IllegalStateException: ValueIterator can't be
iterated more than once,otherwise there could be data lost
at
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java
<https://urldefense.proofpoint.com/v2/url?u=http-3A__GroupNonMergingWindowsFunctions.java&d=DwQD-g&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=7_fGoNrUoepqe81jwQN20j2TOEbJEfA-EHXiPa2B_wM&s=pVZKiNJxo2mW6dISx2QxGaUs-K0dBnjv3be2MqDvgyo&e=>:221)
at java.lang.Iterable.spliterator(Iterable.java
<https://urldefense.proofpoint.com/v2/url?u=http-3A__Iterable.java&d=DwQD-g&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=7_fGoNrUoepqe81jwQN20j2TOEbJEfA-EHXiPa2B_wM&s=_8I1NMTsVoB5XYEIGnFpn6j_UnmFxbU7dBIj4iWnfcI&e=>:101)
at com.company.Main$2.processElement(Main.java
<https://urldefense.proofpoint.com/v2/url?u=http-3A__Main.java&d=DwQD-g&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=7_fGoNrUoepqe81jwQN20j2TOEbJEfA-EHXiPa2B_wM&s=Q6BCePmzjtGMRzcj9gkaqd_u0T9k5ihHXnxx3lmSs8c&e=>:65)
Code attached
citi_logo_mailciti_logo_mail*Noam Gershi*
Software Developer
*T*:+972 (3) 7405718
Mail_signature_blue