If you have two topics with different schemas in your pipeline then you need to read them separately with two different KafkaIO instances and configure every instance with a proper deserialiser based on its schema.
— Alexey > On 9 Aug 2022, at 22:28, Sigalit Eliazov <e.siga...@gmail.com> wrote: > > Thanks for your response > we have different messages with separate schemas. > > I'll review the suggested solution. > BR > Sigalit > > On Tue, Aug 9, 2022 at 3:40 PM Moritz Mack <mm...@talend.com > <mailto:mm...@talend.com>> wrote: > Hi Sigalit, > > > > Could you explain a bit more in detail what you mean by 2 different types of > messages? > > Do they share the same schema, e.g. using a union / one of type? Or are you > in fact talking about different messages with separate schemas (e.g. > discriminated using a message header)? > > > > The recommended usage (at least with Confluent) is to use one schema per > topic. Using the Confluent registry it’s fairly simple then: > > > > .withValueDeserializer( > > > ConfluentSchemaRegistryDeserializerProvider.of(registryUrl, subject, null /* > latest */, config))) > > > > Most likely you have to implement a similar DeserializerProvider for > Apicurio. You could also try using apicurio.registry.as-confluent, but that > requires to configure your producers accordingly. > > I any case, I suggest you study ConfluentSchemaRegistryDeserializerProvider. > That should lead you a path forward. > > > > Best, > > Moritz > > > > On 09.08.22, 13:08, "Sigalit Eliazov" <e.siga...@gmail.com > <mailto:e.siga...@gmail.com>> wrote: > > > > Hi all we have a single kafka topic which is used to receive 2 different > types of messages. These 2 messages are Avro. So when reading messages from > kafka i used the GenericRecord KafkaIO. <String, GenericRecord>read() > .withBootstrapServers(bootstrapServers) > > Hi all > > we have a single kafka topic which is used to receive 2 different types of > messages. > > These 2 messages are Avro. > > So when reading messages from kafka i used the GenericRecord > > > > KafkaIO.<String, GenericRecord>read() > .withBootstrapServers(bootstrapServers) > .withTopic(topic) > .withConsumerConfigUpdates(ImmutableMap.of( > SerdeConfig.REGISTRY_URL, PipelineUtil.getSchemaURL(), > ConsumerConfig.GROUP_ID_CONFIG, consumerGroup, > SerdeConfig.CHECK_PERIOD_MS, TimeUnit.DAYS.toMillis(1) > )) > .withKeyDeserializer(StringDeserializer.class) > I am not sure how to define the withValueDeserializer and coder. > i tried to read the message as GenericRecord but it fails with > "Could not extract the Kafka Deserializer type from class > io.apicurio.registry.serde.avro.AvroKafkaDeserialize" > i am using apicurio as the schema registry > > Thanks > Sigalit > As a recipient of an email from Talend, your contact personal data will be on > our systems. Please see our privacy notice. <https://www.talend.com/privacy/> >