There are probably smarter people than me on this list but since I recently
been through a similar thought exercise...

For the generic use in Kettle I have a PCollection<KettleRow> going through
the pipeline.
KettleRow is just an Object[] wrapper for which I can implement a Coder.

The "group by" that I implemented does the following:Split
PCollection<KettleRow> into PCollection<KV<KettleRow, KettleRow>>
Then it  applies the standard GroupByKey.create() giving us
PCollection<KV<KettleRow, Iterable<KettleRow>>>
This means that we can simple aggregate all the elements in
Iterable<KettleRow> to aggregate a group.

Well, at least that works for me. The code is open so look at it over here:
https://github.com/mattcasters/kettle-beam-core/blob/master/src/main/java/org/kettle/beam/core/transform/GroupByTransform.java

Like you I had trouble with the Coder for my KettleRows so I hacked up this
to make it work:
https://github.com/mattcasters/kettle-beam-core/blob/master/src/main/java/org/kettle/beam/core/coder/KettleRowCoder.java

It's set on the pipeline:
pipeline.getCoderRegistry().registerCoderForClass( KettleRow.class, new
KettleRowCoder());

Good luck!
Matt

Op zo 2 dec. 2018 om 20:57 schreef Eran Twili <[email protected]>:

> Hi,
>
>
>
> We are considering using Beam in our software.
>
> We wish to create a service for a user which will operate Beam for him,
> and obviously the user code doesn't have Beam API visibility.
>
> For that we need to generify some Beam API.
>
> So the user supply functions and we embed them in a generic *PTransform*
> and run them in a Beam pipeline.
>
> We have some difficulties to understand how can we provide the user with
> option to perform *GroupByKey* operation.
>
> The problem is that *GroupByKey* takes *KV* and our *PCollections* holds
> only user datatypes which should not be Beam datatypes.
>
> So we thought about having this * PTransform*:
>
> public class PlatformGroupByKey<K,V> extends
>         PTransform<PCollection<CustomType<SimpleImmutableEntry<K,V>>>,
> PCollection<CustomType<SimpleImmutableEntry<K,Iterable<V>>>>> {
>     @Override
>     public PCollection<CustomType<SimpleImmutableEntry<K,Iterable<V>>>>
> expand(PCollection<CustomType<SimpleImmutableEntry<K,V>>> input) {
>
>         return input
>                 .apply("MapToKV",
>                         MapElements.*via*(
>                             new
> SimpleFunction<CustomType<SimpleImmutableEntry<K,V>>, KV<K, V>>() {
>                                 @Override
>                                 public KV<K, V> apply
> (CustomType<SimpleImmutableEntry<K,V>> kv) {
>                                     return KV.*of*(kv.field.getKey(), kv.
> field.getValue()); }}))
>                 .apply("GroupByKey",
>                         GroupByKey.*create*())
>                 .apply("MapToSimpleImmutableEntry",
>                         MapElements.*via*(
>                             new SimpleFunction<KV<K, Iterable<V>>,
> CustomType<SimpleImmutableEntry<K,Iterable<V>>>>() {
>                                 @Override
>                                 public CustomType<SimpleImmutableEntry<K,
> Iterable<V>>> apply(KV<K, Iterable<V>> kv) {
>                                     return new CustomType<>(new
> SimpleImmutableEntry<>(kv.getKey(), kv.getValue())); }}));
>     }
> }
>
> In which we will get *PCollection* from our key-value type (java's
> *SimpleImmutableEntry*),
>
> Convert it to *KV*,
>
> Preform the *GroupByKey*,
>
> And re-convert it again to *SimpleImmutableEntry*.
>
>
>
> But we get this error in runtime:
>
>
>
> java.lang.IllegalStateException: Unable to return a default Coder for
> GroupByKey/MapToKV/Map/ParMultiDo(Anonymous).output [PCollection]. Correct
> one of the following root causes:
>
>   No Coder has been manually specified;  you may do so using .setCoder().
>
>   Inferring a Coder from the CoderRegistry failed: Cannot provide coder
> for parameterized type org.apache.beam.sdk.values.KV<K, V>: Unable to
> provide a Coder for K.
>
>   Building a Coder using a registered CoderProvider failed.
>
>   See suppressed exceptions for detailed failures.
>
>   Using the default output Coder from the producing PTransform failed:
> PTransform.getOutputCoder called.
>
>                 at
> org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkState(Preconditions.java:444)
>
>                 at
> org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:278)
>
>                 at
> org.apache.beam.sdk.values.PCollection.finishSpecifying(PCollection.java:115)
>
>                 at
> org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(TransformHierarchy.java:190)
>
>                 at
> org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:536)
>
>                 at
> org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
>
>                 at
> org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
>
>                 at
> org.apache.beam.examples.platform.PlatformGroupByKey.expand(PlatformGroupByKey.java:27)
>
>
>
> We don't understand why is *K* generic type gets into runtime.
>
> In runtime it will been known by the *PCollection* concrete input
> parameter that is being send to the *expand* method.
>
> What are we doing wrong? Is there a way to achieve what we want using Beam?
>
> Appreciate any help.
>
>
>
> Regards,
>
> Eran
>
>
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>

Reply via email to