Awesome, thanks for the info! It worked like a charm!

On Thu, Apr 4, 2024 at 9:49 PM Reuven Lax <[email protected]> wrote:
>
> There are ways you can manually force the coder here. However I would first 
> try to split up the KV creation into two operations. Have ProcessEvents just 
> create a PCollection<SharedCoreEvent>, and then a following operation to 
> create the KV. Something like this:
>
> input.apply(ParDo.of(New ProcessEvents()))
>         .apply(WithKeys.of((SerializableFunction<SharedCoreEvent, Long>) 
> ExtractKeyFunction).withKeyType(TypeDescriptors.longs()));
>
> I suspect that this will allow the mechanism to better infer the final Coder. 
> If that doesn't work, you could always brute force it like this:
>
> PCollection<SharedCoreEvent> coreEvents = input.apply(ParDo.of(New 
> ProcessEvents()));
> coreEvents.apply(WithKeys.of((SerializableFunction<SharedCoreEvent, Long>) 
> ExtractKeyFunction).withKeyType(TypeDescriptors.longs()))
>          .setCoder(KvCoder.of(LongCoder.of(), coreEvents.getCoder()))
>          .apply(Reshuffle.of())
>          ... etc.
>
>
> On Thu, Apr 4, 2024 at 8:19 PM Ruben Vargas <[email protected]> wrote:
>>
>> ProcessEvents receive as an input a Session object and créate a KV<long, 
>> SharedCoreEvent> as an output
>>
>> El El jue, 4 de abr de 2024 a la(s) 8:52 p.m., Reuven Lax via user 
>> <[email protected]> escribió:
>>>
>>> There are some sharp edges unfortunately around auto-inference of KV coders 
>>> and schemas. Is there a previous PCollection of type SharedCoreEvent, or is 
>>> the SharedCoreEvent created in ProcessEvents?
>>>
>>> On Thu, Apr 4, 2024 at 2:12 PM Ruben Vargas <[email protected]> wrote:
>>>>
>>>> Hello guys
>>>>
>>>> I have a question, is it possible to use KV along with AutoValueSchema
>>>> objects? I'm having troubles when I try to use it together.
>>>>
>>>> I have an object like the following
>>>>
>>>> @AutoValue
>>>> @DefaultSchema(AutoValueSchema.class)
>>>> public abstract class SharedCoreEvent {
>>>>
>>>> @JsonProperty("subscriptionId")
>>>> public abstract String getSubscription();
>>>>
>>>> <other properties>
>>>> }
>>>>
>>>> Then I have a pipeline like the following:
>>>>
>>>>  input.apply(ParDo.of(new ProcessEvents()))
>>>>         .apply(Reshuffle.of()).apply(Values.create()).apply(output);
>>>>
>>>> My input is a single object and my ProcessEvents will produce tons of
>>>> events, in a fan-out fashion. that is why I used Reshuffle here
>>>>
>>>> But when I run this pipeline it throws the following error:
>>>>
>>>> lang.IllegalStateException: Unable to return a default Coder for
>>>> MCEvents/ParDo(ProcessEvents)/ParMultiDo(ProcessEvents).output
>>>> [PCollection@2131266396]. Correct one of the following root causes:
>>>>   No Coder has been manually specified;  you may do so using .setCoder().
>>>>
>>>>   Inferring a Coder from the CoderRegistry failed: Cannot provide
>>>> coder for parameterized type
>>>> org.apache.beam.sdk.values.KV<java.lang.Long, events.SharedCoreEvent>:
>>>> Unable to provide a Coder for events.SharedCoreEvent
>>>>   Building a Coder using a registered CoderProvider failed.
>>>>
>>>>
>>>> Something similar happens with my source when I use KafkaIO and the
>>>> source produces a KV<String,Session>  PCollection.
>>>>
>>>> Is there any reason for this? Maybe I'm misusing the schemas?
>>>>
>>>> Really appreciate your help
>>>>
>>>> Thanks
>>>> Ruben

Reply via email to