If you have two topics with different schemas in your pipeline then you need to 
read them separately with two different KafkaIO instances and configure every 
instance with a proper deserialiser based on its schema.

—
Alexey

> On 9 Aug 2022, at 22:28, Sigalit Eliazov <e.siga...@gmail.com> wrote:
> 
> Thanks for your response
> we have different messages with separate schemas.
> 
> I'll review the suggested solution.
> BR
> Sigalit
> 
> On Tue, Aug 9, 2022 at 3:40 PM Moritz Mack <mm...@talend.com 
> <mailto:mm...@talend.com>> wrote:
> Hi Sigalit,
> 
>  
> 
> Could you explain a bit more in detail what you mean by 2 different types of 
> messages?
> 
> Do they share the same schema, e.g. using a union / one of type? Or are you 
> in fact talking about different messages with separate schemas (e.g. 
> discriminated using a message header)?
> 
>  
> 
> The recommended usage (at least with Confluent) is to use one schema per 
> topic. Using the Confluent registry it’s fairly simple then:
> 
>  
> 
>              .withValueDeserializer(
> 
>                     
> ConfluentSchemaRegistryDeserializerProvider.of(registryUrl, subject, null /* 
> latest */, config)))
> 
>  
> 
> Most likely you have to implement a similar DeserializerProvider for 
> Apicurio. You could also try using  apicurio.registry.as-confluent, but that 
> requires to configure your producers accordingly.
> 
> I any case, I suggest you study ConfluentSchemaRegistryDeserializerProvider. 
> That should lead you a path forward.
> 
>  
> 
> Best,
> 
> Moritz
> 
>  
> 
> On 09.08.22, 13:08, "Sigalit Eliazov" <e.siga...@gmail.com 
> <mailto:e.siga...@gmail.com>> wrote:
> 
>  
> 
> Hi all we have a single kafka topic which is used to receive 2 different 
> types of messages. These 2 messages are Avro. So when reading messages from 
> kafka i used the GenericRecord KafkaIO. <String, GenericRecord>read() 
> .withBootstrapServers(bootstrapServers)
> 
> Hi all
> 
> we have a single kafka topic which is used to receive 2 different types of 
> messages.
> 
> These 2 messages are Avro.
> 
> So when reading messages from kafka i used the GenericRecord
> 
>  
> 
> KafkaIO.<String, GenericRecord>read()
>         .withBootstrapServers(bootstrapServers)
>         .withTopic(topic)
>         .withConsumerConfigUpdates(ImmutableMap.of(
>                 SerdeConfig.REGISTRY_URL, PipelineUtil.getSchemaURL(),
>                 ConsumerConfig.GROUP_ID_CONFIG, consumerGroup,
>                 SerdeConfig.CHECK_PERIOD_MS, TimeUnit.DAYS.toMillis(1)
>         ))
>         .withKeyDeserializer(StringDeserializer.class)
> I am not sure how to define the withValueDeserializer and coder.
> i tried to read the message as GenericRecord but it fails with
>  "Could not extract the Kafka Deserializer type from class 
> io.apicurio.registry.serde.avro.AvroKafkaDeserialize" 
> i am using apicurio as the schema registry
>  
> Thanks
> Sigalit
> As a recipient of an email from Talend, your contact personal data will be on 
> our systems. Please see our privacy notice. <https://www.talend.com/privacy/>
> 

Reply via email to