Ops, my bad, I misread the initial question - Moritz pointed out that you have the only one topic with two different schemas…
I don’t think it’s supported by KafkaIO “out-of-the-box.” In this case, you need either to write your own deserialiser which will distinguish the schemas for every input message or split this topic into two where every topic contains the messages with only one schema or use Avro union as it was suggested above. — Alexey > On 10 Aug 2022, at 15:03, Alexey Romanenko <[email protected]> wrote: > > If you have two topics with different schemas in your pipeline then you need > to read them separately with two different KafkaIO instances and configure > every instance with a proper deserialiser based on its schema. > > — > Alexey > >> On 9 Aug 2022, at 22:28, Sigalit Eliazov <[email protected] >> <mailto:[email protected]>> wrote: >> >> Thanks for your response >> we have different messages with separate schemas. >> >> I'll review the suggested solution. >> BR >> Sigalit >> >> On Tue, Aug 9, 2022 at 3:40 PM Moritz Mack <[email protected] >> <mailto:[email protected]>> wrote: >> Hi Sigalit, >> >> >> >> Could you explain a bit more in detail what you mean by 2 different types of >> messages? >> >> Do they share the same schema, e.g. using a union / one of type? Or are you >> in fact talking about different messages with separate schemas (e.g. >> discriminated using a message header)? >> >> >> >> The recommended usage (at least with Confluent) is to use one schema per >> topic. Using the Confluent registry it’s fairly simple then: >> >> >> >> .withValueDeserializer( >> >> >> ConfluentSchemaRegistryDeserializerProvider.of(registryUrl, subject, null /* >> latest */, config))) >> >> >> >> Most likely you have to implement a similar DeserializerProvider for >> Apicurio. You could also try using apicurio.registry.as-confluent, but that >> requires to configure your producers accordingly. >> >> I any case, I suggest you study ConfluentSchemaRegistryDeserializerProvider. >> That should lead you a path forward. >> >> >> >> Best, >> >> Moritz >> >> >> >> On 09.08.22, 13:08, "Sigalit Eliazov" <[email protected] >> <mailto:[email protected]>> wrote: >> >> >> >> Hi all we have a single kafka topic which is used to receive 2 different >> types of messages. These 2 messages are Avro. So when reading messages from >> kafka i used the GenericRecord KafkaIO. <String, GenericRecord>read() >> .withBootstrapServers(bootstrapServers) >> >> Hi all >> >> we have a single kafka topic which is used to receive 2 different types of >> messages. >> >> These 2 messages are Avro. >> >> So when reading messages from kafka i used the GenericRecord >> >> >> >> KafkaIO.<String, GenericRecord>read() >> .withBootstrapServers(bootstrapServers) >> .withTopic(topic) >> .withConsumerConfigUpdates(ImmutableMap.of( >> SerdeConfig.REGISTRY_URL, PipelineUtil.getSchemaURL(), >> ConsumerConfig.GROUP_ID_CONFIG, consumerGroup, >> SerdeConfig.CHECK_PERIOD_MS, TimeUnit.DAYS.toMillis(1) >> )) >> .withKeyDeserializer(StringDeserializer.class) >> I am not sure how to define the withValueDeserializer and coder. >> i tried to read the message as GenericRecord but it fails with >> "Could not extract the Kafka Deserializer type from class >> io.apicurio.registry.serde.avro.AvroKafkaDeserialize" >> i am using apicurio as the schema registry >> >> Thanks >> Sigalit >> As a recipient of an email from Talend, your contact personal data will be >> on our systems. Please see our privacy notice. >> <https://www.talend.com/privacy/> >> >
