On Tue, Nov 5, 2019 at 9:29 PM Aaron Dixon <atdi...@gmail.com> wrote:
> From https://beam.apache.org/documentation/programming-guide/#side-inputs > > > If the side input has multiple trigger firings, Beam uses the value > from the latest trigger firing. This is particularly useful if you use a > side input with a single global window and specify a trigger. > Sorry for this. The documentation is entirely wrong. If a side input has multiple firings, then all elements from all firings are included in the side input as though they are unrelated elements. I have filed https://issues.apache.org/jira/browse/BEAM-8563 to at least prevent the confusing result and give a good error message. I have this sub-pipeline: > > -> GlobalWindow (triggering AfterProcessingTime.pastFirstElementInPane) > Are you sure you want this? It will drop all data after the first firing. We are about to disable such triggers due to the data loss risk. See https://s.apache.org/finishing-triggers-drop-data. If your intent is to drop all subsequent data, I am interested in your use case. Can you share more? > -> Combine.perKey (Max) > -> View.asMap > ...which I use as a side input. > > But I get a "Duplicate values for <key>" error (DirectRunner). (Stack > trace below.) > > But the only way for duplicate keys to come out of the global window is > via multiple triggers. > > What am I missing? > This is surprising. Can you share the actual code of your pipeline? According to your pseudocode, this is impossible. The trigger you described should never fire multiple times. But as I mentioned above, the trigger is about to be forbidden. If we can learn about your usage, maybe that will help. Kenn > > > > === > java.lang.IllegalArgumentException: Duplicate values for :ihop > at > org.apache.beam.sdk.values.PCollectionViews$MapViewFn.apply(PCollectionViews.java:397) > at > org.apache.beam.sdk.values.PCollectionViews$MapViewFn.apply(PCollectionViews.java:373) > at > org.apache.beam.runners.direct.SideInputContainer$SideInputContainerSideInputReader.get(SideInputContainer.java:261) > at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:247) > at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:74) > at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:548) > > >