On Tue, Nov 5, 2019 at 9:29 PM Aaron Dixon <atdi...@gmail.com> wrote:

> From https://beam.apache.org/documentation/programming-guide/#side-inputs
>
> > If the side input has multiple trigger firings, Beam uses the value
> from the latest trigger firing. This is particularly useful if you use a
> side input with a single global window and specify a trigger.
>

Sorry for this. The documentation is entirely wrong. If a side input has
multiple firings, then all elements from all firings are included in the
side input as though they are unrelated elements. I have filed
https://issues.apache.org/jira/browse/BEAM-8563 to at least prevent the
confusing result and give a good error message.

I have this sub-pipeline:
>
> -> GlobalWindow (triggering AfterProcessingTime.pastFirstElementInPane)
>

Are you sure you want this? It will drop all data after the first firing.
We are about to disable such triggers due to the data loss risk. See
https://s.apache.org/finishing-triggers-drop-data. If your intent is to
drop all subsequent data, I am interested in your use case. Can you share
more?


> -> Combine.perKey (Max)
> -> View.asMap
> ...which I use as a side input.
>
> But I get a "Duplicate values for <key>" error (DirectRunner). (Stack
> trace below.)
>
> But the only way for duplicate keys to come out of the global window is
> via multiple triggers.
>
> What am I missing?
>

This is surprising. Can you share the actual code of your pipeline?
According to your pseudocode, this is impossible. The trigger you described
should never fire multiple times. But as I mentioned above, the trigger is
about to be forbidden. If we can learn about your usage, maybe that will
help.

Kenn


>
>
>
> ===
> java.lang.IllegalArgumentException: Duplicate values for :ihop
> at
> org.apache.beam.sdk.values.PCollectionViews$MapViewFn.apply(PCollectionViews.java:397)
> at
> org.apache.beam.sdk.values.PCollectionViews$MapViewFn.apply(PCollectionViews.java:373)
> at
> org.apache.beam.runners.direct.SideInputContainer$SideInputContainerSideInputReader.get(SideInputContainer.java:261)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:247)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:74)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:548)
>
>
>

Reply via email to