There are probably smarter people than me on this list but since I recently been through a similar thought exercise...
For the generic use in Kettle I have a PCollection<KettleRow> going through the pipeline. KettleRow is just an Object[] wrapper for which I can implement a Coder. The "group by" that I implemented does the following:Split PCollection<KettleRow> into PCollection<KV<KettleRow, KettleRow>> Then it applies the standard GroupByKey.create() giving us PCollection<KV<KettleRow, Iterable<KettleRow>>> This means that we can simple aggregate all the elements in Iterable<KettleRow> to aggregate a group. Well, at least that works for me. The code is open so look at it over here: https://github.com/mattcasters/kettle-beam-core/blob/master/src/main/java/org/kettle/beam/core/transform/GroupByTransform.java Like you I had trouble with the Coder for my KettleRows so I hacked up this to make it work: https://github.com/mattcasters/kettle-beam-core/blob/master/src/main/java/org/kettle/beam/core/coder/KettleRowCoder.java It's set on the pipeline: pipeline.getCoderRegistry().registerCoderForClass( KettleRow.class, new KettleRowCoder()); Good luck! Matt Op zo 2 dec. 2018 om 20:57 schreef Eran Twili <[email protected]>: > Hi, > > > > We are considering using Beam in our software. > > We wish to create a service for a user which will operate Beam for him, > and obviously the user code doesn't have Beam API visibility. > > For that we need to generify some Beam API. > > So the user supply functions and we embed them in a generic *PTransform* > and run them in a Beam pipeline. > > We have some difficulties to understand how can we provide the user with > option to perform *GroupByKey* operation. > > The problem is that *GroupByKey* takes *KV* and our *PCollections* holds > only user datatypes which should not be Beam datatypes. > > So we thought about having this * PTransform*: > > public class PlatformGroupByKey<K,V> extends > PTransform<PCollection<CustomType<SimpleImmutableEntry<K,V>>>, > PCollection<CustomType<SimpleImmutableEntry<K,Iterable<V>>>>> { > @Override > public PCollection<CustomType<SimpleImmutableEntry<K,Iterable<V>>>> > expand(PCollection<CustomType<SimpleImmutableEntry<K,V>>> input) { > > return input > .apply("MapToKV", > MapElements.*via*( > new > SimpleFunction<CustomType<SimpleImmutableEntry<K,V>>, KV<K, V>>() { > @Override > public KV<K, V> apply > (CustomType<SimpleImmutableEntry<K,V>> kv) { > return KV.*of*(kv.field.getKey(), kv. > field.getValue()); }})) > .apply("GroupByKey", > GroupByKey.*create*()) > .apply("MapToSimpleImmutableEntry", > MapElements.*via*( > new SimpleFunction<KV<K, Iterable<V>>, > CustomType<SimpleImmutableEntry<K,Iterable<V>>>>() { > @Override > public CustomType<SimpleImmutableEntry<K, > Iterable<V>>> apply(KV<K, Iterable<V>> kv) { > return new CustomType<>(new > SimpleImmutableEntry<>(kv.getKey(), kv.getValue())); }})); > } > } > > In which we will get *PCollection* from our key-value type (java's > *SimpleImmutableEntry*), > > Convert it to *KV*, > > Preform the *GroupByKey*, > > And re-convert it again to *SimpleImmutableEntry*. > > > > But we get this error in runtime: > > > > java.lang.IllegalStateException: Unable to return a default Coder for > GroupByKey/MapToKV/Map/ParMultiDo(Anonymous).output [PCollection]. Correct > one of the following root causes: > > No Coder has been manually specified; you may do so using .setCoder(). > > Inferring a Coder from the CoderRegistry failed: Cannot provide coder > for parameterized type org.apache.beam.sdk.values.KV<K, V>: Unable to > provide a Coder for K. > > Building a Coder using a registered CoderProvider failed. > > See suppressed exceptions for detailed failures. > > Using the default output Coder from the producing PTransform failed: > PTransform.getOutputCoder called. > > at > org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkState(Preconditions.java:444) > > at > org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:278) > > at > org.apache.beam.sdk.values.PCollection.finishSpecifying(PCollection.java:115) > > at > org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(TransformHierarchy.java:190) > > at > org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:536) > > at > org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488) > > at > org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370) > > at > org.apache.beam.examples.platform.PlatformGroupByKey.expand(PlatformGroupByKey.java:27) > > > > We don't understand why is *K* generic type gets into runtime. > > In runtime it will been known by the *PCollection* concrete input > parameter that is being send to the *expand* method. > > What are we doing wrong? Is there a way to achieve what we want using Beam? > > Appreciate any help. > > > > Regards, > > Eran > > > > > Confidentiality: This communication and any attachments are intended for > the above-named persons only and may be confidential and/or legally > privileged. Any opinions expressed in this communication are not > necessarily those of NICE Actimize. If this communication has come to you > in error you must take no action based on it, nor must you copy or show it > to anyone; please delete/destroy and inform the sender by e-mail > immediately. > Monitoring: NICE Actimize may monitor incoming and outgoing e-mails. > Viruses: Although we have taken steps toward ensuring that this e-mail and > attachments are free from any virus, we advise that in keeping with good > computing practice the recipient should ensure they are actually virus free. >
