Re: Combine.Global

2017-04-12 Thread Paul Gerver
Ah, I found my mistake. You overrode the getAccumulator and 
getDefaultOutputCoders which my implementation did not. This approach is 
straight forward now.

Thanks!

On 2017-04-07 23:46 (-0500), Aviem Zur  wrote: 
> I wasn't able to reproduce the issue you're experiencing.
> I've created a gist with an example that works and is similar to what you
> have described.
> Please help us make tweaks to the gist reproduce your problem:
> https://gist.github.com/aviemzur/ba213d98b4484492099b3cf709ddded0
> 
> On Fri, Apr 7, 2017 at 7:25 PM Paul Gerver  wrote:
> 
> > Yes, the pipeline is quite small:
> >
> > pipeline.apply("source",
> > Read.from(new CustomSource())).setCoder(CustomSource.coder)
> > .apply("GlobalCombine", Combine.globally(new
> > CustomCombineFn())).setCoder(CustomTuple.coder);
> >
> >
> > The InputT is not the same as OutputT, so the input coder can't be used.
> >
> > On 2017-04-07 08:58 (-0500), Aviem Zur  wrote:
> > > Have you set the coder for your input PCollection? The one on which you
> > > perform the Combine?
> > >
> > > On Fri, Apr 7, 2017 at 4:24 PM Paul Gerver  wrote:
> > >
> > > > Hello All,
> > > >
> > > > I'm trying to test out a Combine.Globally transform which takes in a
> > small
> > > > custom class (CustomA) and outputs a secondary custom class (CustomB).
> > I
> > > > have set the coder for the resulting PCollection, but Beam is
> > > > arguing that a coder for a KV type is missing (see output at bottom).
> > > >
> > > > Since this a global combine, the input nor the output is of KV type,
> > so I
> > > > decided to take a look at the Combine code. Since
> > Combine.Globally.expand()
> > > > performs a perKeys and groupedValues underneath the covers, but
> > requires
> > > > making an intermediate PCollection KV which--according
> > to
> > > > the docs--is inferred from the CombineFn.
> > > >
> > > > I believe I could workaround this by registering a KvCoder with the
> > > > CoderRegistry, but that's not intuitive. Is there a better way to
> > address
> > > > this currently, or should something be added to the CombineFn area for
> > > > setting an output coder similar to PCollection.
> > > >
> > > >
> > > > Output:
> > > > Exception in thread "main" java.lang.IllegalStateException: Unable to
> > > > return a default Coder for
> > > >
> > > >
> > GlobalCombine/Combine.perKey(CustomTuple)/Combine.GroupedValues/ParDo(Anonymous).out
> > > > [Class]. Correct one of the following root causes:
> > > >   No Coder has been manually specified;  you may do so using
> > .setCoder().
> > > >   Inferring a Coder from the CoderRegistry failed: Unable to provide a
> > > > default Coder for org.apache.beam.sdk.values.KV. Correct
> > one of
> > > > the following root causes:
> > > >   Building a Coder using a registered CoderFactory failed: Cannot
> > provide
> > > > coder for parameterized type org.apache.beam.sdk.values.KV:
> > > > Unable to provide a default Coder for java.lang.Object. Correct one of
> > the
> > > > following root causes:
> > > >
> > > >
> > > > Stack:
> > > > at
> > > >
> > > >
> > org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:174)
> > > > at
> > > > org.apache.beam.sdk.values.TypedPValue.getCoder(TypedPValue.java:51)
> > > > at
> > > > org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:130)
> > > > at
> > > >
> > > >
> > org.apache.beam.sdk.values.TypedPValue.finishSpecifying(TypedPValue.java:90)
> > > > at
> > > >
> > > >
> > org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(TransformHierarchy.java:95)
> > > > at
> > org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:386)
> > > > at
> > org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:302)
> > > > at
> > > > org.apache.beam.sdk.values.PCollection.apply(PCollection.java:154)
> > > > at
> > > >
> > org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:1460)
> > > > at
> > > >
> > org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:1337)
> > > > at
> > > >
> > org.apache.beam.sdk.runners.PipelineRunner.apply(PipelineRunner.java:76)
> > > > at
> > > >
> > org.apache.beam.runners.direct.DirectRunner.apply(DirectRunner.java:296)
> > > > at
> > org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:388)
> > > > at
> > org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:318)
> > > > at
> > > > org.apache.beam.sdk.values.PCollection.apply(PCollection.java:167)
> > > > at
> > > > org.iastate.edu.CombineTestPipeline.main(CombineTestPipeline.java:110)
> > > >
> > > >
> > > > Let me know. Thanks!
> > > > -Paul G
> > > >
> > > > --
> > > > *Paul Gerver*
> > > > pfger...@gmail.com
> > > >
> > >
> >
> 


Re: Combine.Global

2017-04-10 Thread Thomas Groh
This looks like it might be because the output coder cannot be determined.
It looks like the registry understands that it must build a KvCoder, but
cannot infer the coder for OutputT. More specifically, within the stack
trace, the following line occurs:

"Unable to provide a default Coder for java.lang.Object. Correct one of the
following root causes:"

CombineFn provides a `getDefaultOutputCoder(CoderRegistry, Coder)`
method which may be suitable here for producing the coder for your outputs.

(I can produce a very similar stack trace:
https://gist.github.com/tgroh/04d4b638e7fabf8a03187760ddb26eef)

On Fri, Apr 7, 2017 at 9:46 PM, Aviem Zur  wrote:

> I wasn't able to reproduce the issue you're experiencing.
> I've created a gist with an example that works and is similar to what you
> have described.
> Please help us make tweaks to the gist reproduce your problem:
> https://gist.github.com/aviemzur/ba213d98b4484492099b3cf709ddded0
>
> On Fri, Apr 7, 2017 at 7:25 PM Paul Gerver  wrote:
>
> > Yes, the pipeline is quite small:
> >
> > pipeline.apply("source",
> > Read.from(new CustomSource())).setCoder(
> CustomSource.coder)
> > .apply("GlobalCombine", Combine.globally(new
> > CustomCombineFn())).setCoder(CustomTuple.coder);
> >
> >
> > The InputT is not the same as OutputT, so the input coder can't be used.
> >
> > On 2017-04-07 08:58 (-0500), Aviem Zur  wrote:
> > > Have you set the coder for your input PCollection? The one on which you
> > > perform the Combine?
> > >
> > > On Fri, Apr 7, 2017 at 4:24 PM Paul Gerver  wrote:
> > >
> > > > Hello All,
> > > >
> > > > I'm trying to test out a Combine.Globally transform which takes in a
> > small
> > > > custom class (CustomA) and outputs a secondary custom class
> (CustomB).
> > I
> > > > have set the coder for the resulting PCollection, but Beam
> is
> > > > arguing that a coder for a KV type is missing (see output at bottom).
> > > >
> > > > Since this a global combine, the input nor the output is of KV type,
> > so I
> > > > decided to take a look at the Combine code. Since
> > Combine.Globally.expand()
> > > > performs a perKeys and groupedValues underneath the covers, but
> > requires
> > > > making an intermediate PCollection KV which--according
> > to
> > > > the docs--is inferred from the CombineFn.
> > > >
> > > > I believe I could workaround this by registering a KvCoder with the
> > > > CoderRegistry, but that's not intuitive. Is there a better way to
> > address
> > > > this currently, or should something be added to the CombineFn area
> for
> > > > setting an output coder similar to PCollection.
> > > >
> > > >
> > > > Output:
> > > > Exception in thread "main" java.lang.IllegalStateException: Unable
> to
> > > > return a default Coder for
> > > >
> > > >
> > GlobalCombine/Combine.perKey(CustomTuple)/Combine.
> GroupedValues/ParDo(Anonymous).out
> > > > [Class]. Correct one of the following root causes:
> > > >   No Coder has been manually specified;  you may do so using
> > .setCoder().
> > > >   Inferring a Coder from the CoderRegistry failed: Unable to provide
> a
> > > > default Coder for org.apache.beam.sdk.values.KV. Correct
> > one of
> > > > the following root causes:
> > > >   Building a Coder using a registered CoderFactory failed: Cannot
> > provide
> > > > coder for parameterized type org.apache.beam.sdk.values.KV OutputT>:
> > > > Unable to provide a default Coder for java.lang.Object. Correct one
> of
> > the
> > > > following root causes:
> > > >
> > > >
> > > > Stack:
> > > > at
> > > >
> > > >
> > org.apache.beam.sdk.repackaged.com.google.common.
> base.Preconditions.checkState(Preconditions.java:174)
> > > > at
> > > > org.apache.beam.sdk.values.TypedPValue.getCoder(TypedPValue.java:51)
> > > > at
> > > > org.apache.beam.sdk.values.PCollection.getCoder(
> PCollection.java:130)
> > > > at
> > > >
> > > >
> > org.apache.beam.sdk.values.TypedPValue.finishSpecifying(
> TypedPValue.java:90)
> > > > at
> > > >
> > > >
> > org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(
> TransformHierarchy.java:95)
> > > > at
> > org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:386)
> > > > at
> > org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:302)
> > > > at
> > > > org.apache.beam.sdk.values.PCollection.apply(PCollection.java:154)
> > > > at
> > > >
> > org.apache.beam.sdk.transforms.Combine$Globally.
> expand(Combine.java:1460)
> > > > at
> > > >
> > org.apache.beam.sdk.transforms.Combine$Globally.
> expand(Combine.java:1337)
> > > > at
> > > >
> > org.apache.beam.sdk.runners.PipelineRunner.apply(PipelineRunner.java:76)
> > > > at
> > > >
> > org.apache.beam.runners.direct.DirectRunner.apply(DirectRunner.java:296)
> > > > at
> > 

Re: Combine.Global

2017-04-07 Thread Aviem Zur
I wasn't able to reproduce the issue you're experiencing.
I've created a gist with an example that works and is similar to what you
have described.
Please help us make tweaks to the gist reproduce your problem:
https://gist.github.com/aviemzur/ba213d98b4484492099b3cf709ddded0

On Fri, Apr 7, 2017 at 7:25 PM Paul Gerver  wrote:

> Yes, the pipeline is quite small:
>
> pipeline.apply("source",
> Read.from(new CustomSource())).setCoder(CustomSource.coder)
> .apply("GlobalCombine", Combine.globally(new
> CustomCombineFn())).setCoder(CustomTuple.coder);
>
>
> The InputT is not the same as OutputT, so the input coder can't be used.
>
> On 2017-04-07 08:58 (-0500), Aviem Zur  wrote:
> > Have you set the coder for your input PCollection? The one on which you
> > perform the Combine?
> >
> > On Fri, Apr 7, 2017 at 4:24 PM Paul Gerver  wrote:
> >
> > > Hello All,
> > >
> > > I'm trying to test out a Combine.Globally transform which takes in a
> small
> > > custom class (CustomA) and outputs a secondary custom class (CustomB).
> I
> > > have set the coder for the resulting PCollection, but Beam is
> > > arguing that a coder for a KV type is missing (see output at bottom).
> > >
> > > Since this a global combine, the input nor the output is of KV type,
> so I
> > > decided to take a look at the Combine code. Since
> Combine.Globally.expand()
> > > performs a perKeys and groupedValues underneath the covers, but
> requires
> > > making an intermediate PCollection KV which--according
> to
> > > the docs--is inferred from the CombineFn.
> > >
> > > I believe I could workaround this by registering a KvCoder with the
> > > CoderRegistry, but that's not intuitive. Is there a better way to
> address
> > > this currently, or should something be added to the CombineFn area for
> > > setting an output coder similar to PCollection.
> > >
> > >
> > > Output:
> > > Exception in thread "main" java.lang.IllegalStateException: Unable to
> > > return a default Coder for
> > >
> > >
> GlobalCombine/Combine.perKey(CustomTuple)/Combine.GroupedValues/ParDo(Anonymous).out
> > > [Class]. Correct one of the following root causes:
> > >   No Coder has been manually specified;  you may do so using
> .setCoder().
> > >   Inferring a Coder from the CoderRegistry failed: Unable to provide a
> > > default Coder for org.apache.beam.sdk.values.KV. Correct
> one of
> > > the following root causes:
> > >   Building a Coder using a registered CoderFactory failed: Cannot
> provide
> > > coder for parameterized type org.apache.beam.sdk.values.KV:
> > > Unable to provide a default Coder for java.lang.Object. Correct one of
> the
> > > following root causes:
> > >
> > >
> > > Stack:
> > > at
> > >
> > >
> org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:174)
> > > at
> > > org.apache.beam.sdk.values.TypedPValue.getCoder(TypedPValue.java:51)
> > > at
> > > org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:130)
> > > at
> > >
> > >
> org.apache.beam.sdk.values.TypedPValue.finishSpecifying(TypedPValue.java:90)
> > > at
> > >
> > >
> org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(TransformHierarchy.java:95)
> > > at
> org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:386)
> > > at
> org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:302)
> > > at
> > > org.apache.beam.sdk.values.PCollection.apply(PCollection.java:154)
> > > at
> > >
> org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:1460)
> > > at
> > >
> org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:1337)
> > > at
> > >
> org.apache.beam.sdk.runners.PipelineRunner.apply(PipelineRunner.java:76)
> > > at
> > >
> org.apache.beam.runners.direct.DirectRunner.apply(DirectRunner.java:296)
> > > at
> org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:388)
> > > at
> org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:318)
> > > at
> > > org.apache.beam.sdk.values.PCollection.apply(PCollection.java:167)
> > > at
> > > org.iastate.edu.CombineTestPipeline.main(CombineTestPipeline.java:110)
> > >
> > >
> > > Let me know. Thanks!
> > > -Paul G
> > >
> > > --
> > > *Paul Gerver*
> > > pfger...@gmail.com
> > >
> >
>


Re: Combine.Global

2017-04-07 Thread Aviem Zur
Have you set the coder for your input PCollection? The one on which you
perform the Combine?

On Fri, Apr 7, 2017 at 4:24 PM Paul Gerver  wrote:

> Hello All,
>
> I'm trying to test out a Combine.Globally transform which takes in a small
> custom class (CustomA) and outputs a secondary custom class (CustomB). I
> have set the coder for the resulting PCollection, but Beam is
> arguing that a coder for a KV type is missing (see output at bottom).
>
> Since this a global combine, the input nor the output is of KV type, so I
> decided to take a look at the Combine code. Since Combine.Globally.expand()
> performs a perKeys and groupedValues underneath the covers, but requires
> making an intermediate PCollection KV which--according to
> the docs--is inferred from the CombineFn.
>
> I believe I could workaround this by registering a KvCoder with the
> CoderRegistry, but that's not intuitive. Is there a better way to address
> this currently, or should something be added to the CombineFn area for
> setting an output coder similar to PCollection.
>
>
> Output:
> Exception in thread "main" java.lang.IllegalStateException: Unable to
> return a default Coder for
>
> GlobalCombine/Combine.perKey(CustomTuple)/Combine.GroupedValues/ParDo(Anonymous).out
> [Class]. Correct one of the following root causes:
>   No Coder has been manually specified;  you may do so using .setCoder().
>   Inferring a Coder from the CoderRegistry failed: Unable to provide a
> default Coder for org.apache.beam.sdk.values.KV. Correct one of
> the following root causes:
>   Building a Coder using a registered CoderFactory failed: Cannot provide
> coder for parameterized type org.apache.beam.sdk.values.KV:
> Unable to provide a default Coder for java.lang.Object. Correct one of the
> following root causes:
>
>
> Stack:
> at
>
> org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:174)
> at
> org.apache.beam.sdk.values.TypedPValue.getCoder(TypedPValue.java:51)
> at
> org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:130)
> at
>
> org.apache.beam.sdk.values.TypedPValue.finishSpecifying(TypedPValue.java:90)
> at
>
> org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(TransformHierarchy.java:95)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:386)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:302)
> at
> org.apache.beam.sdk.values.PCollection.apply(PCollection.java:154)
> at
> org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:1460)
> at
> org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:1337)
> at
> org.apache.beam.sdk.runners.PipelineRunner.apply(PipelineRunner.java:76)
> at
> org.apache.beam.runners.direct.DirectRunner.apply(DirectRunner.java:296)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:388)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:318)
> at
> org.apache.beam.sdk.values.PCollection.apply(PCollection.java:167)
> at
> org.iastate.edu.CombineTestPipeline.main(CombineTestPipeline.java:110)
>
>
> Let me know. Thanks!
> -Paul G
>
> --
> *Paul Gerver*
> pfger...@gmail.com
>


Combine.Global

2017-04-07 Thread Paul Gerver
Hello All,

I'm trying to test out a Combine.Globally transform which takes in a small
custom class (CustomA) and outputs a secondary custom class (CustomB). I
have set the coder for the resulting PCollection, but Beam is
arguing that a coder for a KV type is missing (see output at bottom).

Since this a global combine, the input nor the output is of KV type, so I
decided to take a look at the Combine code. Since Combine.Globally.expand()
performs a perKeys and groupedValues underneath the covers, but requires
making an intermediate PCollection KV which--according to
the docs--is inferred from the CombineFn.

I believe I could workaround this by registering a KvCoder with the
CoderRegistry, but that's not intuitive. Is there a better way to address
this currently, or should something be added to the CombineFn area for
setting an output coder similar to PCollection.


Output:
Exception in thread "main" java.lang.IllegalStateException: Unable to
return a default Coder for
GlobalCombine/Combine.perKey(CustomTuple)/Combine.GroupedValues/ParDo(Anonymous).out
[Class]. Correct one of the following root causes:
  No Coder has been manually specified;  you may do so using .setCoder().
  Inferring a Coder from the CoderRegistry failed: Unable to provide a
default Coder for org.apache.beam.sdk.values.KV. Correct one of
the following root causes:
  Building a Coder using a registered CoderFactory failed: Cannot provide
coder for parameterized type org.apache.beam.sdk.values.KV:
Unable to provide a default Coder for java.lang.Object. Correct one of the
following root causes:


Stack:
at
org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:174)
at
org.apache.beam.sdk.values.TypedPValue.getCoder(TypedPValue.java:51)
at
org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:130)
at
org.apache.beam.sdk.values.TypedPValue.finishSpecifying(TypedPValue.java:90)
at
org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(TransformHierarchy.java:95)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:386)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:302)
at
org.apache.beam.sdk.values.PCollection.apply(PCollection.java:154)
at
org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:1460)
at
org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:1337)
at
org.apache.beam.sdk.runners.PipelineRunner.apply(PipelineRunner.java:76)
at
org.apache.beam.runners.direct.DirectRunner.apply(DirectRunner.java:296)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:388)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:318)
at
org.apache.beam.sdk.values.PCollection.apply(PCollection.java:167)
at
org.iastate.edu.CombineTestPipeline.main(CombineTestPipeline.java:110)


Let me know. Thanks!
-Paul G

-- 
*Paul Gerver*
pfger...@gmail.com