[jira] [Resolved] (BEAM-1149) Side input access fails in direct runner (possibly others too) when input element in multiple windows
[ https://issues.apache.org/jira/browse/BEAM-1149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kenneth Knowles resolved BEAM-1149. --- Resolution: Fixed > Side input access fails in direct runner (possibly others too) when input > element in multiple windows > - > > Key: BEAM-1149 > URL: https://issues.apache.org/jira/browse/BEAM-1149 > Project: Beam > Issue Type: Bug >Reporter: Eugene Kirpichov >Assignee: Kenneth Knowles >Priority: Blocker > Fix For: 0.4.0-incubating > > > {code:java} > private static class FnWithSideInputs extends DoFn{ > private final PCollectionView view; > private FnWithSideInputs(PCollectionView view) { > this.view = view; > } > @ProcessElement > public void processElement(ProcessContext c) { > c.output(c.element() + ":" + c.sideInput(view)); > } > } > @Test > public void testSideInputsWithMultipleWindows() { > Pipeline p = TestPipeline.create(); > MutableDateTime mutableNow = Instant.now().toMutableDateTime(); > mutableNow.setMillisOfSecond(0); > Instant now = mutableNow.toInstant(); > SlidingWindows windowFn = > > SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1)); > PCollectionView view = > p.apply(Create.of(1)).apply(View.asSingleton()); > PCollection res = > p.apply(Create.timestamped(TimestampedValue.of("a", now))) > .apply(Window.into(windowFn)) > .apply(ParDo.of(new FnWithSideInputs(view)).withSideInputs(view)); > PAssert.that(res).containsInAnyOrder("a:1"); > p.run(); > } > {code} > This fails with the following exception: > {code} > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.lang.IllegalStateException: sideInput called when main input element is > in multiple windows > at > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:343) > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:1) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:176) > at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:112) > at > Caused by: java.lang.IllegalStateException: sideInput called when main input > element is in multiple windows > at > org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:514) > at > org.apache.beam.sdk.transforms.ParDoTest$FnWithSideInputs.processElement(ParDoTest.java:738) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (BEAM-1149) Side input access fails in direct runner (possibly others too) when input element in multiple windows
[ https://issues.apache.org/jira/browse/BEAM-1149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eugene Kirpichov resolved BEAM-1149. Resolution: Fixed Fix Version/s: 0.4.0-incubating > Side input access fails in direct runner (possibly others too) when input > element in multiple windows > - > > Key: BEAM-1149 > URL: https://issues.apache.org/jira/browse/BEAM-1149 > Project: Beam > Issue Type: Bug >Reporter: Eugene Kirpichov >Assignee: Eugene Kirpichov >Priority: Blocker > Fix For: 0.4.0-incubating > > > {code:java} > private static class FnWithSideInputs extends DoFn{ > private final PCollectionView view; > private FnWithSideInputs(PCollectionView view) { > this.view = view; > } > @ProcessElement > public void processElement(ProcessContext c) { > c.output(c.element() + ":" + c.sideInput(view)); > } > } > @Test > public void testSideInputsWithMultipleWindows() { > Pipeline p = TestPipeline.create(); > MutableDateTime mutableNow = Instant.now().toMutableDateTime(); > mutableNow.setMillisOfSecond(0); > Instant now = mutableNow.toInstant(); > SlidingWindows windowFn = > > SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1)); > PCollectionView view = > p.apply(Create.of(1)).apply(View.asSingleton()); > PCollection res = > p.apply(Create.timestamped(TimestampedValue.of("a", now))) > .apply(Window.into(windowFn)) > .apply(ParDo.of(new FnWithSideInputs(view)).withSideInputs(view)); > PAssert.that(res).containsInAnyOrder("a:1"); > p.run(); > } > {code} > This fails with the following exception: > {code} > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.lang.IllegalStateException: sideInput called when main input element is > in multiple windows > at > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:343) > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:1) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:176) > at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:112) > at > Caused by: java.lang.IllegalStateException: sideInput called when main input > element is in multiple windows > at > org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:514) > at > org.apache.beam.sdk.transforms.ParDoTest$FnWithSideInputs.processElement(ParDoTest.java:738) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)