Awesome, thanks for the info! It worked like a charm!
On Thu, Apr 4, 2024 at 9:49 PM Reuven Lax <[email protected]> wrote: > > There are ways you can manually force the coder here. However I would first > try to split up the KV creation into two operations. Have ProcessEvents just > create a PCollection<SharedCoreEvent>, and then a following operation to > create the KV. Something like this: > > input.apply(ParDo.of(New ProcessEvents())) > .apply(WithKeys.of((SerializableFunction<SharedCoreEvent, Long>) > ExtractKeyFunction).withKeyType(TypeDescriptors.longs())); > > I suspect that this will allow the mechanism to better infer the final Coder. > If that doesn't work, you could always brute force it like this: > > PCollection<SharedCoreEvent> coreEvents = input.apply(ParDo.of(New > ProcessEvents())); > coreEvents.apply(WithKeys.of((SerializableFunction<SharedCoreEvent, Long>) > ExtractKeyFunction).withKeyType(TypeDescriptors.longs())) > .setCoder(KvCoder.of(LongCoder.of(), coreEvents.getCoder())) > .apply(Reshuffle.of()) > ... etc. > > > On Thu, Apr 4, 2024 at 8:19 PM Ruben Vargas <[email protected]> wrote: >> >> ProcessEvents receive as an input a Session object and créate a KV<long, >> SharedCoreEvent> as an output >> >> El El jue, 4 de abr de 2024 a la(s) 8:52 p.m., Reuven Lax via user >> <[email protected]> escribió: >>> >>> There are some sharp edges unfortunately around auto-inference of KV coders >>> and schemas. Is there a previous PCollection of type SharedCoreEvent, or is >>> the SharedCoreEvent created in ProcessEvents? >>> >>> On Thu, Apr 4, 2024 at 2:12 PM Ruben Vargas <[email protected]> wrote: >>>> >>>> Hello guys >>>> >>>> I have a question, is it possible to use KV along with AutoValueSchema >>>> objects? I'm having troubles when I try to use it together. >>>> >>>> I have an object like the following >>>> >>>> @AutoValue >>>> @DefaultSchema(AutoValueSchema.class) >>>> public abstract class SharedCoreEvent { >>>> >>>> @JsonProperty("subscriptionId") >>>> public abstract String getSubscription(); >>>> >>>> <other properties> >>>> } >>>> >>>> Then I have a pipeline like the following: >>>> >>>> input.apply(ParDo.of(new ProcessEvents())) >>>> .apply(Reshuffle.of()).apply(Values.create()).apply(output); >>>> >>>> My input is a single object and my ProcessEvents will produce tons of >>>> events, in a fan-out fashion. that is why I used Reshuffle here >>>> >>>> But when I run this pipeline it throws the following error: >>>> >>>> lang.IllegalStateException: Unable to return a default Coder for >>>> MCEvents/ParDo(ProcessEvents)/ParMultiDo(ProcessEvents).output >>>> [PCollection@2131266396]. Correct one of the following root causes: >>>> No Coder has been manually specified; you may do so using .setCoder(). >>>> >>>> Inferring a Coder from the CoderRegistry failed: Cannot provide >>>> coder for parameterized type >>>> org.apache.beam.sdk.values.KV<java.lang.Long, events.SharedCoreEvent>: >>>> Unable to provide a Coder for events.SharedCoreEvent >>>> Building a Coder using a registered CoderProvider failed. >>>> >>>> >>>> Something similar happens with my source when I use KafkaIO and the >>>> source produces a KV<String,Session> PCollection. >>>> >>>> Is there any reason for this? Maybe I'm misusing the schemas? >>>> >>>> Really appreciate your help >>>> >>>> Thanks >>>> Ruben
