[jira] [Updated] (BEAM-5098) Combine.Globally::asSingletonView clears side inputs
[ https://issues.apache.org/jira/browse/BEAM-5098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kenneth Knowles updated BEAM-5098: -- Component/s: (was: beam-model) sdk-java-core > Combine.Globally::asSingletonView clears side inputs > > > Key: BEAM-5098 > URL: https://issues.apache.org/jira/browse/BEAM-5098 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 2.5.0 >Reporter: Mike Pedersen >Assignee: Kenneth Knowles >Priority: Critical > Labels: beginner, starter > > It seems like calling .asSingletonView on Combine.Globally clears all side > inputs. Take this code for example: > > {code:java} > public class Main { > public static void main(String[] args) { > PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline p = Pipeline.create(options); > PCollection a = p.apply(Create.of(1, 2, 3)); > PCollectionView b = > p.apply(Create.of(10)).apply(View.asSingleton()); > a > .apply(Combine.globally(new > CombineWithContext.CombineFnWithContext() { > @Override > public Integer > createAccumulator(CombineWithContext.Context c) { > return c.sideInput(b); > } > @Override > public Integer addInput(Integer accumulator, Integer > input, CombineWithContext.Context c) { > return accumulator + input; > } > @Override > public Integer mergeAccumulators(Iterable > accumulators, CombineWithContext.Context c) { > int sum = 0; > for (int i : accumulators) { > sum += i; > } > return sum; > } > @Override > public Integer extractOutput(Integer accumulator, > CombineWithContext.Context c) { > return accumulator; > } > @Override > public Integer defaultValue() { > return 0; > } > }).withSideInputs(b).asSingletonView()); > p.run().waitUntilFinish(); > } > }{code} > This fails with the following exception: > {code:java} > Exception in thread "main" > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.lang.IllegalArgumentException: calling sideInput() with unknown view > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:349) > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:319) > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:210) > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297) > at Main.main(Main.java:287) > Caused by: java.lang.IllegalArgumentException: calling sideInput() with > unknown view > at > org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:212) > at > org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:69) > at > org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:489) > at > org.apache.beam.sdk.transforms.Combine$GroupedValues$1$1.sideInput(Combine.java:2137) > at Main$1.createAccumulator(Main.java:258) > at Main$1.createAccumulator(Main.java:255) > at > org.apache.beam.sdk.transforms.CombineWithContext$CombineFnWithContext.apply(CombineWithContext.java:120) > at > org.apache.beam.sdk.transforms.Combine$GroupedValues$1.processElement(Combine.java:2129){code} > But if you change > {code:java} > .withSideInputs(b).asSingletonView()){code} > to > {code:java} > .withSideInputs(b)).apply(View.asSingleton()){code} > then it works just fine. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-5098) Combine.Globally::asSingletonView clears side inputs
[ https://issues.apache.org/jira/browse/BEAM-5098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kenneth Knowles updated BEAM-5098: -- Labels: beginner starter (was: ) > Combine.Globally::asSingletonView clears side inputs > > > Key: BEAM-5098 > URL: https://issues.apache.org/jira/browse/BEAM-5098 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 2.5.0 >Reporter: Mike Pedersen >Assignee: Kenneth Knowles >Priority: Critical > Labels: beginner, starter > > It seems like calling .asSingletonView on Combine.Globally clears all side > inputs. Take this code for example: > > {code:java} > public class Main { > public static void main(String[] args) { > PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline p = Pipeline.create(options); > PCollection a = p.apply(Create.of(1, 2, 3)); > PCollectionView b = > p.apply(Create.of(10)).apply(View.asSingleton()); > a > .apply(Combine.globally(new > CombineWithContext.CombineFnWithContext() { > @Override > public Integer > createAccumulator(CombineWithContext.Context c) { > return c.sideInput(b); > } > @Override > public Integer addInput(Integer accumulator, Integer > input, CombineWithContext.Context c) { > return accumulator + input; > } > @Override > public Integer mergeAccumulators(Iterable > accumulators, CombineWithContext.Context c) { > int sum = 0; > for (int i : accumulators) { > sum += i; > } > return sum; > } > @Override > public Integer extractOutput(Integer accumulator, > CombineWithContext.Context c) { > return accumulator; > } > @Override > public Integer defaultValue() { > return 0; > } > }).withSideInputs(b).asSingletonView()); > p.run().waitUntilFinish(); > } > }{code} > This fails with the following exception: > {code:java} > Exception in thread "main" > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.lang.IllegalArgumentException: calling sideInput() with unknown view > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:349) > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:319) > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:210) > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297) > at Main.main(Main.java:287) > Caused by: java.lang.IllegalArgumentException: calling sideInput() with > unknown view > at > org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:212) > at > org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:69) > at > org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:489) > at > org.apache.beam.sdk.transforms.Combine$GroupedValues$1$1.sideInput(Combine.java:2137) > at Main$1.createAccumulator(Main.java:258) > at Main$1.createAccumulator(Main.java:255) > at > org.apache.beam.sdk.transforms.CombineWithContext$CombineFnWithContext.apply(CombineWithContext.java:120) > at > org.apache.beam.sdk.transforms.Combine$GroupedValues$1.processElement(Combine.java:2129){code} > But if you change > {code:java} > .withSideInputs(b).asSingletonView()){code} > to > {code:java} > .withSideInputs(b)).apply(View.asSingleton()){code} > then it works just fine. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-5098) Combine.Globally::asSingletonView clears side inputs
[ https://issues.apache.org/jira/browse/BEAM-5098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kenneth Knowles updated BEAM-5098: -- Priority: Critical (was: Major) > Combine.Globally::asSingletonView clears side inputs > > > Key: BEAM-5098 > URL: https://issues.apache.org/jira/browse/BEAM-5098 > Project: Beam > Issue Type: Bug > Components: beam-model >Affects Versions: 2.5.0 >Reporter: Mike Pedersen >Assignee: Kenneth Knowles >Priority: Critical > > It seems like calling .asSingletonView on Combine.Globally clears all side > inputs. Take this code for example: > > {code:java} > public class Main { > public static void main(String[] args) { > PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline p = Pipeline.create(options); > PCollection a = p.apply(Create.of(1, 2, 3)); > PCollectionView b = > p.apply(Create.of(10)).apply(View.asSingleton()); > a > .apply(Combine.globally(new > CombineWithContext.CombineFnWithContext() { > @Override > public Integer > createAccumulator(CombineWithContext.Context c) { > return c.sideInput(b); > } > @Override > public Integer addInput(Integer accumulator, Integer > input, CombineWithContext.Context c) { > return accumulator + input; > } > @Override > public Integer mergeAccumulators(Iterable > accumulators, CombineWithContext.Context c) { > int sum = 0; > for (int i : accumulators) { > sum += i; > } > return sum; > } > @Override > public Integer extractOutput(Integer accumulator, > CombineWithContext.Context c) { > return accumulator; > } > @Override > public Integer defaultValue() { > return 0; > } > }).withSideInputs(b).asSingletonView()); > p.run().waitUntilFinish(); > } > }{code} > This fails with the following exception: > {code:java} > Exception in thread "main" > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.lang.IllegalArgumentException: calling sideInput() with unknown view > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:349) > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:319) > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:210) > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297) > at Main.main(Main.java:287) > Caused by: java.lang.IllegalArgumentException: calling sideInput() with > unknown view > at > org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:212) > at > org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:69) > at > org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:489) > at > org.apache.beam.sdk.transforms.Combine$GroupedValues$1$1.sideInput(Combine.java:2137) > at Main$1.createAccumulator(Main.java:258) > at Main$1.createAccumulator(Main.java:255) > at > org.apache.beam.sdk.transforms.CombineWithContext$CombineFnWithContext.apply(CombineWithContext.java:120) > at > org.apache.beam.sdk.transforms.Combine$GroupedValues$1.processElement(Combine.java:2129){code} > But if you change > {code:java} > .withSideInputs(b).asSingletonView()){code} > to > {code:java} > .withSideInputs(b)).apply(View.asSingleton()){code} > then it works just fine. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-5098) Combine.Globally::asSingletonView clears side inputs
[ https://issues.apache.org/jira/browse/BEAM-5098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mike Pedersen updated BEAM-5098: Description: It seems like calling .asSingletonView on Combine.Globally clears all side inputs. Take this code for example: {code:java} public class Main { public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.create(); Pipeline p = Pipeline.create(options); PCollection a = p.apply(Create.of(1, 2, 3)); PCollectionView b = p.apply(Create.of(10)).apply(View.asSingleton()); a .apply(Combine.globally(new CombineWithContext.CombineFnWithContext() { @Override public Integer createAccumulator(CombineWithContext.Context c) { return c.sideInput(b); } @Override public Integer addInput(Integer accumulator, Integer input, CombineWithContext.Context c) { return accumulator + input; } @Override public Integer mergeAccumulators(Iterable accumulators, CombineWithContext.Context c) { int sum = 0; for (int i : accumulators) { sum += i; } return sum; } @Override public Integer extractOutput(Integer accumulator, CombineWithContext.Context c) { return accumulator; } @Override public Integer defaultValue() { return 0; } }).withSideInputs(b).asSingletonView()); p.run().waitUntilFinish(); } }{code} This fails with the following exception: {code:java} Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException: calling sideInput() with unknown view at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:349) at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:319) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:210) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297) at Main.main(Main.java:287) Caused by: java.lang.IllegalArgumentException: calling sideInput() with unknown view at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:212) at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:69) at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:489) at org.apache.beam.sdk.transforms.Combine$GroupedValues$1$1.sideInput(Combine.java:2137) at Main$1.createAccumulator(Main.java:258) at Main$1.createAccumulator(Main.java:255) at org.apache.beam.sdk.transforms.CombineWithContext$CombineFnWithContext.apply(CombineWithContext.java:120) at org.apache.beam.sdk.transforms.Combine$GroupedValues$1.processElement(Combine.java:2129){code} But if you change {code:java} .withSideInputs(b).asSingletonView()){code} to {code:java} .withSideInputs(b)).apply(View.asSingleton()){code} then it works just fine. was: It seems like calling .asSingletonView on Combine.Globally clears all side inputs. Take this code for example: {code:java} public class Main { public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.create(); Pipeline p = Pipeline.create(options); PCollection a = p.apply(Create.of(1, 2, 3)); PCollectionView b = p.apply(Create.of(10)).apply(View.asSingleton()); a .apply(Combine.globally(new CombineWithContext.CombineFnWithContext() { @Override public Integer createAccumulator(CombineWithContext.Context c) { return c.sideInput(b); } @Override public Integer addInput(Integer accumulator, Integer input, CombineWithContext.Context c) { return accumulator + input; } @Override public Integer mergeAccumulators(Iterable accumulators, CombineWithContext.Context c) { int sum = 0; for (int i : accumulators) { sum += i; }