Re: Combine.Global
Ah, I found my mistake. You overrode the getAccumulator and getDefaultOutputCoders which my implementation did not. This approach is straight forward now. Thanks! On 2017-04-07 23:46 (-0500), Aviem Zurwrote: > I wasn't able to reproduce the issue you're experiencing. > I've created a gist with an example that works and is similar to what you > have described. > Please help us make tweaks to the gist reproduce your problem: > https://gist.github.com/aviemzur/ba213d98b4484492099b3cf709ddded0 > > On Fri, Apr 7, 2017 at 7:25 PM Paul Gerver wrote: > > > Yes, the pipeline is quite small: > > > > pipeline.apply("source", > > Read.from(new CustomSource())).setCoder(CustomSource.coder) > > .apply("GlobalCombine", Combine.globally(new > > CustomCombineFn())).setCoder(CustomTuple.coder); > > > > > > The InputT is not the same as OutputT, so the input coder can't be used. > > > > On 2017-04-07 08:58 (-0500), Aviem Zur wrote: > > > Have you set the coder for your input PCollection? The one on which you > > > perform the Combine? > > > > > > On Fri, Apr 7, 2017 at 4:24 PM Paul Gerver wrote: > > > > > > > Hello All, > > > > > > > > I'm trying to test out a Combine.Globally transform which takes in a > > small > > > > custom class (CustomA) and outputs a secondary custom class (CustomB). > > I > > > > have set the coder for the resulting PCollection, but Beam is > > > > arguing that a coder for a KV type is missing (see output at bottom). > > > > > > > > Since this a global combine, the input nor the output is of KV type, > > so I > > > > decided to take a look at the Combine code. Since > > Combine.Globally.expand() > > > > performs a perKeys and groupedValues underneath the covers, but > > requires > > > > making an intermediate PCollection KV which--according > > to > > > > the docs--is inferred from the CombineFn. > > > > > > > > I believe I could workaround this by registering a KvCoder with the > > > > CoderRegistry, but that's not intuitive. Is there a better way to > > address > > > > this currently, or should something be added to the CombineFn area for > > > > setting an output coder similar to PCollection. > > > > > > > > > > > > Output: > > > > Exception in thread "main" java.lang.IllegalStateException: Unable to > > > > return a default Coder for > > > > > > > > > > GlobalCombine/Combine.perKey(CustomTuple)/Combine.GroupedValues/ParDo(Anonymous).out > > > > [Class]. Correct one of the following root causes: > > > > No Coder has been manually specified; you may do so using > > .setCoder(). > > > > Inferring a Coder from the CoderRegistry failed: Unable to provide a > > > > default Coder for org.apache.beam.sdk.values.KV . Correct > > one of > > > > the following root causes: > > > > Building a Coder using a registered CoderFactory failed: Cannot > > provide > > > > coder for parameterized type org.apache.beam.sdk.values.KV : > > > > Unable to provide a default Coder for java.lang.Object. Correct one of > > the > > > > following root causes: > > > > > > > > > > > > Stack: > > > > at > > > > > > > > > > org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:174) > > > > at > > > > org.apache.beam.sdk.values.TypedPValue.getCoder(TypedPValue.java:51) > > > > at > > > > org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:130) > > > > at > > > > > > > > > > org.apache.beam.sdk.values.TypedPValue.finishSpecifying(TypedPValue.java:90) > > > > at > > > > > > > > > > org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(TransformHierarchy.java:95) > > > > at > > org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:386) > > > > at > > org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:302) > > > > at > > > > org.apache.beam.sdk.values.PCollection.apply(PCollection.java:154) > > > > at > > > > > > org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:1460) > > > > at > > > > > > org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:1337) > > > > at > > > > > > org.apache.beam.sdk.runners.PipelineRunner.apply(PipelineRunner.java:76) > > > > at > > > > > > org.apache.beam.runners.direct.DirectRunner.apply(DirectRunner.java:296) > > > > at > > org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:388) > > > > at > > org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:318) > > > > at > > > > org.apache.beam.sdk.values.PCollection.apply(PCollection.java:167) > > > > at > > > > org.iastate.edu.CombineTestPipeline.main(CombineTestPipeline.java:110) > > > > > > > > > > > > Let me know. Thanks! > > > > -Paul G > > > > > > > > -- > > > > *Paul Gerver* > > > > pfger...@gmail.com > > > > > > > > > >
Re: Combine.Global
This looks like it might be because the output coder cannot be determined. It looks like the registry understands that it must build a KvCoder, but cannot infer the coder for OutputT. More specifically, within the stack trace, the following line occurs: "Unable to provide a default Coder for java.lang.Object. Correct one of the following root causes:" CombineFn provides a `getDefaultOutputCoder(CoderRegistry, Coder)` method which may be suitable here for producing the coder for your outputs. (I can produce a very similar stack trace: https://gist.github.com/tgroh/04d4b638e7fabf8a03187760ddb26eef) On Fri, Apr 7, 2017 at 9:46 PM, Aviem Zurwrote: > I wasn't able to reproduce the issue you're experiencing. > I've created a gist with an example that works and is similar to what you > have described. > Please help us make tweaks to the gist reproduce your problem: > https://gist.github.com/aviemzur/ba213d98b4484492099b3cf709ddded0 > > On Fri, Apr 7, 2017 at 7:25 PM Paul Gerver wrote: > > > Yes, the pipeline is quite small: > > > > pipeline.apply("source", > > Read.from(new CustomSource())).setCoder( > CustomSource.coder) > > .apply("GlobalCombine", Combine.globally(new > > CustomCombineFn())).setCoder(CustomTuple.coder); > > > > > > The InputT is not the same as OutputT, so the input coder can't be used. > > > > On 2017-04-07 08:58 (-0500), Aviem Zur wrote: > > > Have you set the coder for your input PCollection? The one on which you > > > perform the Combine? > > > > > > On Fri, Apr 7, 2017 at 4:24 PM Paul Gerver wrote: > > > > > > > Hello All, > > > > > > > > I'm trying to test out a Combine.Globally transform which takes in a > > small > > > > custom class (CustomA) and outputs a secondary custom class > (CustomB). > > I > > > > have set the coder for the resulting PCollection, but Beam > is > > > > arguing that a coder for a KV type is missing (see output at bottom). > > > > > > > > Since this a global combine, the input nor the output is of KV type, > > so I > > > > decided to take a look at the Combine code. Since > > Combine.Globally.expand() > > > > performs a perKeys and groupedValues underneath the covers, but > > requires > > > > making an intermediate PCollection KV which--according > > to > > > > the docs--is inferred from the CombineFn. > > > > > > > > I believe I could workaround this by registering a KvCoder with the > > > > CoderRegistry, but that's not intuitive. Is there a better way to > > address > > > > this currently, or should something be added to the CombineFn area > for > > > > setting an output coder similar to PCollection. > > > > > > > > > > > > Output: > > > > Exception in thread "main" java.lang.IllegalStateException: Unable > to > > > > return a default Coder for > > > > > > > > > > GlobalCombine/Combine.perKey(CustomTuple)/Combine. > GroupedValues/ParDo(Anonymous).out > > > > [Class]. Correct one of the following root causes: > > > > No Coder has been manually specified; you may do so using > > .setCoder(). > > > > Inferring a Coder from the CoderRegistry failed: Unable to provide > a > > > > default Coder for org.apache.beam.sdk.values.KV . Correct > > one of > > > > the following root causes: > > > > Building a Coder using a registered CoderFactory failed: Cannot > > provide > > > > coder for parameterized type org.apache.beam.sdk.values.KV OutputT>: > > > > Unable to provide a default Coder for java.lang.Object. Correct one > of > > the > > > > following root causes: > > > > > > > > > > > > Stack: > > > > at > > > > > > > > > > org.apache.beam.sdk.repackaged.com.google.common. > base.Preconditions.checkState(Preconditions.java:174) > > > > at > > > > org.apache.beam.sdk.values.TypedPValue.getCoder(TypedPValue.java:51) > > > > at > > > > org.apache.beam.sdk.values.PCollection.getCoder( > PCollection.java:130) > > > > at > > > > > > > > > > org.apache.beam.sdk.values.TypedPValue.finishSpecifying( > TypedPValue.java:90) > > > > at > > > > > > > > > > org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput( > TransformHierarchy.java:95) > > > > at > > org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:386) > > > > at > > org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:302) > > > > at > > > > org.apache.beam.sdk.values.PCollection.apply(PCollection.java:154) > > > > at > > > > > > org.apache.beam.sdk.transforms.Combine$Globally. > expand(Combine.java:1460) > > > > at > > > > > > org.apache.beam.sdk.transforms.Combine$Globally. > expand(Combine.java:1337) > > > > at > > > > > > org.apache.beam.sdk.runners.PipelineRunner.apply(PipelineRunner.java:76) > > > > at > > > > > > org.apache.beam.runners.direct.DirectRunner.apply(DirectRunner.java:296) > > > > at > >
Re: Combine.Global
I wasn't able to reproduce the issue you're experiencing. I've created a gist with an example that works and is similar to what you have described. Please help us make tweaks to the gist reproduce your problem: https://gist.github.com/aviemzur/ba213d98b4484492099b3cf709ddded0 On Fri, Apr 7, 2017 at 7:25 PM Paul Gerverwrote: > Yes, the pipeline is quite small: > > pipeline.apply("source", > Read.from(new CustomSource())).setCoder(CustomSource.coder) > .apply("GlobalCombine", Combine.globally(new > CustomCombineFn())).setCoder(CustomTuple.coder); > > > The InputT is not the same as OutputT, so the input coder can't be used. > > On 2017-04-07 08:58 (-0500), Aviem Zur wrote: > > Have you set the coder for your input PCollection? The one on which you > > perform the Combine? > > > > On Fri, Apr 7, 2017 at 4:24 PM Paul Gerver wrote: > > > > > Hello All, > > > > > > I'm trying to test out a Combine.Globally transform which takes in a > small > > > custom class (CustomA) and outputs a secondary custom class (CustomB). > I > > > have set the coder for the resulting PCollection, but Beam is > > > arguing that a coder for a KV type is missing (see output at bottom). > > > > > > Since this a global combine, the input nor the output is of KV type, > so I > > > decided to take a look at the Combine code. Since > Combine.Globally.expand() > > > performs a perKeys and groupedValues underneath the covers, but > requires > > > making an intermediate PCollection KV which--according > to > > > the docs--is inferred from the CombineFn. > > > > > > I believe I could workaround this by registering a KvCoder with the > > > CoderRegistry, but that's not intuitive. Is there a better way to > address > > > this currently, or should something be added to the CombineFn area for > > > setting an output coder similar to PCollection. > > > > > > > > > Output: > > > Exception in thread "main" java.lang.IllegalStateException: Unable to > > > return a default Coder for > > > > > > > GlobalCombine/Combine.perKey(CustomTuple)/Combine.GroupedValues/ParDo(Anonymous).out > > > [Class]. Correct one of the following root causes: > > > No Coder has been manually specified; you may do so using > .setCoder(). > > > Inferring a Coder from the CoderRegistry failed: Unable to provide a > > > default Coder for org.apache.beam.sdk.values.KV . Correct > one of > > > the following root causes: > > > Building a Coder using a registered CoderFactory failed: Cannot > provide > > > coder for parameterized type org.apache.beam.sdk.values.KV : > > > Unable to provide a default Coder for java.lang.Object. Correct one of > the > > > following root causes: > > > > > > > > > Stack: > > > at > > > > > > > org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:174) > > > at > > > org.apache.beam.sdk.values.TypedPValue.getCoder(TypedPValue.java:51) > > > at > > > org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:130) > > > at > > > > > > > org.apache.beam.sdk.values.TypedPValue.finishSpecifying(TypedPValue.java:90) > > > at > > > > > > > org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(TransformHierarchy.java:95) > > > at > org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:386) > > > at > org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:302) > > > at > > > org.apache.beam.sdk.values.PCollection.apply(PCollection.java:154) > > > at > > > > org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:1460) > > > at > > > > org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:1337) > > > at > > > > org.apache.beam.sdk.runners.PipelineRunner.apply(PipelineRunner.java:76) > > > at > > > > org.apache.beam.runners.direct.DirectRunner.apply(DirectRunner.java:296) > > > at > org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:388) > > > at > org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:318) > > > at > > > org.apache.beam.sdk.values.PCollection.apply(PCollection.java:167) > > > at > > > org.iastate.edu.CombineTestPipeline.main(CombineTestPipeline.java:110) > > > > > > > > > Let me know. Thanks! > > > -Paul G > > > > > > -- > > > *Paul Gerver* > > > pfger...@gmail.com > > > > > >
Re: Combine.Global
Have you set the coder for your input PCollection? The one on which you perform the Combine? On Fri, Apr 7, 2017 at 4:24 PM Paul Gerverwrote: > Hello All, > > I'm trying to test out a Combine.Globally transform which takes in a small > custom class (CustomA) and outputs a secondary custom class (CustomB). I > have set the coder for the resulting PCollection, but Beam is > arguing that a coder for a KV type is missing (see output at bottom). > > Since this a global combine, the input nor the output is of KV type, so I > decided to take a look at the Combine code. Since Combine.Globally.expand() > performs a perKeys and groupedValues underneath the covers, but requires > making an intermediate PCollection KV which--according to > the docs--is inferred from the CombineFn. > > I believe I could workaround this by registering a KvCoder with the > CoderRegistry, but that's not intuitive. Is there a better way to address > this currently, or should something be added to the CombineFn area for > setting an output coder similar to PCollection. > > > Output: > Exception in thread "main" java.lang.IllegalStateException: Unable to > return a default Coder for > > GlobalCombine/Combine.perKey(CustomTuple)/Combine.GroupedValues/ParDo(Anonymous).out > [Class]. Correct one of the following root causes: > No Coder has been manually specified; you may do so using .setCoder(). > Inferring a Coder from the CoderRegistry failed: Unable to provide a > default Coder for org.apache.beam.sdk.values.KV . Correct one of > the following root causes: > Building a Coder using a registered CoderFactory failed: Cannot provide > coder for parameterized type org.apache.beam.sdk.values.KV : > Unable to provide a default Coder for java.lang.Object. Correct one of the > following root causes: > > > Stack: > at > > org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:174) > at > org.apache.beam.sdk.values.TypedPValue.getCoder(TypedPValue.java:51) > at > org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:130) > at > > org.apache.beam.sdk.values.TypedPValue.finishSpecifying(TypedPValue.java:90) > at > > org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(TransformHierarchy.java:95) > at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:386) > at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:302) > at > org.apache.beam.sdk.values.PCollection.apply(PCollection.java:154) > at > org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:1460) > at > org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:1337) > at > org.apache.beam.sdk.runners.PipelineRunner.apply(PipelineRunner.java:76) > at > org.apache.beam.runners.direct.DirectRunner.apply(DirectRunner.java:296) > at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:388) > at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:318) > at > org.apache.beam.sdk.values.PCollection.apply(PCollection.java:167) > at > org.iastate.edu.CombineTestPipeline.main(CombineTestPipeline.java:110) > > > Let me know. Thanks! > -Paul G > > -- > *Paul Gerver* > pfger...@gmail.com >
Combine.Global
Hello All, I'm trying to test out a Combine.Globally transform which takes in a small custom class (CustomA) and outputs a secondary custom class (CustomB). I have set the coder for the resulting PCollection, but Beam is arguing that a coder for a KV type is missing (see output at bottom). Since this a global combine, the input nor the output is of KV type, so I decided to take a look at the Combine code. Since Combine.Globally.expand() performs a perKeys and groupedValues underneath the covers, but requires making an intermediate PCollection KVwhich--according to the docs--is inferred from the CombineFn. I believe I could workaround this by registering a KvCoder with the CoderRegistry, but that's not intuitive. Is there a better way to address this currently, or should something be added to the CombineFn area for setting an output coder similar to PCollection. Output: Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for GlobalCombine/Combine.perKey(CustomTuple)/Combine.GroupedValues/ParDo(Anonymous).out [Class]. Correct one of the following root causes: No Coder has been manually specified; you may do so using .setCoder(). Inferring a Coder from the CoderRegistry failed: Unable to provide a default Coder for org.apache.beam.sdk.values.KV . Correct one of the following root causes: Building a Coder using a registered CoderFactory failed: Cannot provide coder for parameterized type org.apache.beam.sdk.values.KV : Unable to provide a default Coder for java.lang.Object. Correct one of the following root causes: Stack: at org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:174) at org.apache.beam.sdk.values.TypedPValue.getCoder(TypedPValue.java:51) at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:130) at org.apache.beam.sdk.values.TypedPValue.finishSpecifying(TypedPValue.java:90) at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(TransformHierarchy.java:95) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:386) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:302) at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:154) at org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:1460) at org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:1337) at org.apache.beam.sdk.runners.PipelineRunner.apply(PipelineRunner.java:76) at org.apache.beam.runners.direct.DirectRunner.apply(DirectRunner.java:296) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:388) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:318) at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:167) at org.iastate.edu.CombineTestPipeline.main(CombineTestPipeline.java:110) Let me know. Thanks! -Paul G -- *Paul Gerver* pfger...@gmail.com