Hi Maminspapin,
I haven't worked with Kafka/Flink, yet. But have you had a look at the docs
about the DeserializationSchema [1]? It
mentions ConfluentRegistryAvroDeserializationSchema. Is this something
you're looking for?

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema

On Tue, Mar 30, 2021 at 6:55 AM Maminspapin <un...@mail.ru> wrote:

> I tried this:
>
> 1. Schema (found in stackoverflow)
>
> class GenericRecordSchema implements
> KafkaDeserializationSchema<GenericRecord> {
>
>     private String registryUrl;
>     private transient KafkaAvroDeserializer deserializer;
>
>     public GenericRecordSchema(String registryUrl) {
>         this.registryUrl = registryUrl;
>     }
>
>     @Override
>     public boolean isEndOfStream(GenericRecord nextElement) {
>         return false;
>     }
>
>     @Override
>     public GenericRecord deserialize(ConsumerRecord<byte[], byte[]>
> consumerRecord) throws Exception {
>         checkInitialized();
>         return (GenericRecord)
> deserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
>     }
>
>     @Override
>     public TypeInformation<GenericRecord> getProducedType() {
>         return TypeExtractor.getForClass(GenericRecord.class);
>     }
>
>     private void checkInitialized() {
>         if (deserializer == null) {
>             Map<String, Object> props = new HashMap<>();
>
> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
> registryUrl);
>
> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
>             SchemaRegistryClient client =
>                     new CachedSchemaRegistryClient(
>                             registryUrl,
> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>             deserializer = new KafkaAvroDeserializer(client, props);
>         }
>     }
> }
>
> 2. Consumer
>
> private static FlinkKafkaConsumer<GenericRecord> getConsumer(String topic)
> {
>
>     return new FlinkKafkaConsumer<>(
>             topic,
>             new GenericRecordSchema("http://xxx.xx.xxx.xx:8081";),
>             getConsumerProperties());
> }
>
> But when I start the app, the following error is happen:
>
> com.esotericsoftware.kryo.KryoException:
> java.lang.UnsupportedOperationException
> Serialization trace:
> reserved (org.apache.avro.Schema$Field)
> fieldMap (org.apache.avro.Schema$RecordSchema)
> schema (org.apache.avro.generic.GenericData$Record)
>         at
>
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>         at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>         at
>
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>         at
>
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>         at
>
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>         at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>         at
>
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>         at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>         at
>
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
>         at
>
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
>         at
>
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>         at
>
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>         at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>         at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>         at
>
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
>         at
>
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
>         at
>
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
>         at
>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
>         at
>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>         at
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>         at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>         at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>         at
>
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
> Caused by: java.lang.UnsupportedOperationException
>         at
> java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
>         at
>
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
>         at
>
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>         at
>
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>         ... 26 more
>
>
> Not solving with:
> env.getConfig().disableForceKryo();
> env.getConfig().enableForceAvro();
>
>
> Any idea?
>
> Thanks
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to