The issue at hand is that the record contains an unmodifiable collection
which the kryo serialiser attempts to modify by first initialising the
object and then adding items to the collection (iirc).

Caused by: java.lang.UnsupportedOperationException
>         at
> java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)


Without knowing the specifics of what it is exactly you are trying to
deserialise I can only attempt to give a generic answer which is to try
something like:


> StreamExecutionEnvironment see =
> StreamExecutionEnvironment.getExecutionEnvironment();
> Class<?> unmodColl =
> Class.forName("java.util.Collections$UnmodifiableCollection");
> see.getConfig().addDefaultKryoSerializer(unmodColl,
> UnmodifiableCollectionsSerializer.class);


An even better approach is to set-up a local sandbox environment in docker
with Kafka and a sink of your choice and simply running the application
form the main method in debug mode and setting a breakpoint right before it
throws the exception.

Kind regards,
Arian Rohani


Den ons 31 mars 2021 kl 13:27 skrev Matthias Pohl <matth...@ververica.com>:

> Hi Maminspapin,
> I haven't worked with Kafka/Flink, yet. But have you had a look at the
> docs about the DeserializationSchema [1]? It
> mentions ConfluentRegistryAvroDeserializationSchema. Is this something
> you're looking for?
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema
>
> On Tue, Mar 30, 2021 at 6:55 AM Maminspapin <un...@mail.ru> wrote:
>
>> I tried this:
>>
>> 1. Schema (found in stackoverflow)
>>
>> class GenericRecordSchema implements
>> KafkaDeserializationSchema<GenericRecord> {
>>
>>     private String registryUrl;
>>     private transient KafkaAvroDeserializer deserializer;
>>
>>     public GenericRecordSchema(String registryUrl) {
>>         this.registryUrl = registryUrl;
>>     }
>>
>>     @Override
>>     public boolean isEndOfStream(GenericRecord nextElement) {
>>         return false;
>>     }
>>
>>     @Override
>>     public GenericRecord deserialize(ConsumerRecord<byte[], byte[]>
>> consumerRecord) throws Exception {
>>         checkInitialized();
>>         return (GenericRecord)
>> deserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
>>     }
>>
>>     @Override
>>     public TypeInformation<GenericRecord> getProducedType() {
>>         return TypeExtractor.getForClass(GenericRecord.class);
>>     }
>>
>>     private void checkInitialized() {
>>         if (deserializer == null) {
>>             Map<String, Object> props = new HashMap<>();
>>
>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>> registryUrl);
>>
>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
>>             SchemaRegistryClient client =
>>                     new CachedSchemaRegistryClient(
>>                             registryUrl,
>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>>             deserializer = new KafkaAvroDeserializer(client, props);
>>         }
>>     }
>> }
>>
>> 2. Consumer
>>
>> private static FlinkKafkaConsumer<GenericRecord> getConsumer(String
>> topic) {
>>
>>     return new FlinkKafkaConsumer<>(
>>             topic,
>>             new GenericRecordSchema("http://xxx.xx.xxx.xx:8081";),
>>             getConsumerProperties());
>> }
>>
>> But when I start the app, the following error is happen:
>>
>> com.esotericsoftware.kryo.KryoException:
>> java.lang.UnsupportedOperationException
>> Serialization trace:
>> reserved (org.apache.avro.Schema$Field)
>> fieldMap (org.apache.avro.Schema$RecordSchema)
>> schema (org.apache.avro.generic.GenericData$Record)
>>         at
>>
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>         at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>         at
>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>         at
>>
>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>>         at
>>
>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>         at
>>
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>         at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>         at
>>
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>         at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>>         at
>>
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
>>         at
>>
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
>>         at
>>
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>>         at
>>
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>>         at
>>
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>>         at
>>
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>>         at
>>
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
>>         at
>>
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
>>         at
>>
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
>>         at
>>
>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
>>         at
>>
>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>>         at
>>
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>>         at
>>
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>>         at
>>
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>>         at
>>
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
>> Caused by: java.lang.UnsupportedOperationException
>>         at
>> java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
>>         at
>>
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
>>         at
>>
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>         at
>>
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>         ... 26 more
>>
>>
>> Not solving with:
>> env.getConfig().disableForceKryo();
>> env.getConfig().enableForceAvro();
>>
>>
>> Any idea?
>>
>> Thanks
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>

Reply via email to