Hi Maminspapin, I haven't worked with Kafka/Flink, yet. But have you had a look at the docs about the DeserializationSchema [1]? It mentions ConfluentRegistryAvroDeserializationSchema. Is this something you're looking for?
Best, Matthias [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema On Tue, Mar 30, 2021 at 6:55 AM Maminspapin <un...@mail.ru> wrote: > I tried this: > > 1. Schema (found in stackoverflow) > > class GenericRecordSchema implements > KafkaDeserializationSchema<GenericRecord> { > > private String registryUrl; > private transient KafkaAvroDeserializer deserializer; > > public GenericRecordSchema(String registryUrl) { > this.registryUrl = registryUrl; > } > > @Override > public boolean isEndOfStream(GenericRecord nextElement) { > return false; > } > > @Override > public GenericRecord deserialize(ConsumerRecord<byte[], byte[]> > consumerRecord) throws Exception { > checkInitialized(); > return (GenericRecord) > deserializer.deserialize(consumerRecord.topic(), consumerRecord.value()); > } > > @Override > public TypeInformation<GenericRecord> getProducedType() { > return TypeExtractor.getForClass(GenericRecord.class); > } > > private void checkInitialized() { > if (deserializer == null) { > Map<String, Object> props = new HashMap<>(); > > props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, > registryUrl); > > props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false); > SchemaRegistryClient client = > new CachedSchemaRegistryClient( > registryUrl, > AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT); > deserializer = new KafkaAvroDeserializer(client, props); > } > } > } > > 2. Consumer > > private static FlinkKafkaConsumer<GenericRecord> getConsumer(String topic) > { > > return new FlinkKafkaConsumer<>( > topic, > new GenericRecordSchema("http://xxx.xx.xxx.xx:8081"), > getConsumerProperties()); > } > > But when I start the app, the following error is happen: > > com.esotericsoftware.kryo.KryoException: > java.lang.UnsupportedOperationException > Serialization trace: > reserved (org.apache.avro.Schema$Field) > fieldMap (org.apache.avro.Schema$RecordSchema) > schema (org.apache.avro.generic.GenericData$Record) > at > > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) > at > > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at > > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) > at > > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) > at > > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) > at > > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) > at > > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) > at > > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) > at > > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273) > at > > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69) > at > > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) > at > > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) > at > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) > at > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) > at > > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322) > at > > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426) > at > > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365) > at > > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183) > at > > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) > at > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) > at > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) > at > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) > at > > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241) > Caused by: java.lang.UnsupportedOperationException > at > java.util.Collections$UnmodifiableCollection.add(Collections.java:1057) > at > > com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) > at > > com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) > at > > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) > ... 26 more > > > Not solving with: > env.getConfig().disableForceKryo(); > env.getConfig().enableForceAvro(); > > > Any idea? > > Thanks > > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/