Arian gave good pointers, but I'd go even further: you should have ITCases
where you pretty much just execute a mini job with docker-based Kafka and
run it automatically.
I strongly recommend to check out testcontainers [1], it makes writing such
a test a really smooth experience.

[1] https://www.testcontainers.org/modules/kafka/


On Wed, Mar 31, 2021 at 2:29 PM Arian Rohani <arianroh...@gmail.com> wrote:

> The issue at hand is that the record contains an unmodifiable collection
> which the kryo serialiser attempts to modify by first initialising the
> object and then adding items to the collection (iirc).
>
> Caused by: java.lang.UnsupportedOperationException
>>         at
>> java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
>
>
> Without knowing the specifics of what it is exactly you are trying to
> deserialise I can only attempt to give a generic answer which is to try
> something like:
>
>
>> StreamExecutionEnvironment see =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> Class<?> unmodColl =
>> Class.forName("java.util.Collections$UnmodifiableCollection");
>> see.getConfig().addDefaultKryoSerializer(unmodColl,
>> UnmodifiableCollectionsSerializer.class);
>
>
> An even better approach is to set-up a local sandbox environment in docker
> with Kafka and a sink of your choice and simply running the application
> form the main method in debug mode and setting a breakpoint right before it
> throws the exception.
>
> Kind regards,
> Arian Rohani
>
>
> Den ons 31 mars 2021 kl 13:27 skrev Matthias Pohl <matth...@ververica.com
> >:
>
>> Hi Maminspapin,
>> I haven't worked with Kafka/Flink, yet. But have you had a look at the
>> docs about the DeserializationSchema [1]? It
>> mentions ConfluentRegistryAvroDeserializationSchema. Is this something
>> you're looking for?
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema
>>
>> On Tue, Mar 30, 2021 at 6:55 AM Maminspapin <un...@mail.ru> wrote:
>>
>>> I tried this:
>>>
>>> 1. Schema (found in stackoverflow)
>>>
>>> class GenericRecordSchema implements
>>> KafkaDeserializationSchema<GenericRecord> {
>>>
>>>     private String registryUrl;
>>>     private transient KafkaAvroDeserializer deserializer;
>>>
>>>     public GenericRecordSchema(String registryUrl) {
>>>         this.registryUrl = registryUrl;
>>>     }
>>>
>>>     @Override
>>>     public boolean isEndOfStream(GenericRecord nextElement) {
>>>         return false;
>>>     }
>>>
>>>     @Override
>>>     public GenericRecord deserialize(ConsumerRecord<byte[], byte[]>
>>> consumerRecord) throws Exception {
>>>         checkInitialized();
>>>         return (GenericRecord)
>>> deserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
>>>     }
>>>
>>>     @Override
>>>     public TypeInformation<GenericRecord> getProducedType() {
>>>         return TypeExtractor.getForClass(GenericRecord.class);
>>>     }
>>>
>>>     private void checkInitialized() {
>>>         if (deserializer == null) {
>>>             Map<String, Object> props = new HashMap<>();
>>>
>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>>> registryUrl);
>>>
>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
>>> false);
>>>             SchemaRegistryClient client =
>>>                     new CachedSchemaRegistryClient(
>>>                             registryUrl,
>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>>>             deserializer = new KafkaAvroDeserializer(client, props);
>>>         }
>>>     }
>>> }
>>>
>>> 2. Consumer
>>>
>>> private static FlinkKafkaConsumer<GenericRecord> getConsumer(String
>>> topic) {
>>>
>>>     return new FlinkKafkaConsumer<>(
>>>             topic,
>>>             new GenericRecordSchema("http://xxx.xx.xxx.xx:8081";),
>>>             getConsumerProperties());
>>> }
>>>
>>> But when I start the app, the following error is happen:
>>>
>>> com.esotericsoftware.kryo.KryoException:
>>> java.lang.UnsupportedOperationException
>>> Serialization trace:
>>> reserved (org.apache.avro.Schema$Field)
>>> fieldMap (org.apache.avro.Schema$RecordSchema)
>>> schema (org.apache.avro.generic.GenericData$Record)
>>>         at
>>>
>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>         at
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>         at
>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>         at
>>>
>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>>>         at
>>>
>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>         at
>>>
>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>>         at
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>         at
>>>
>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>>         at
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>>>         at
>>>
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
>>>         at
>>>
>>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
>>>         at
>>>
>>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>>>         at
>>>
>>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>>>         at
>>>
>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>>>         at
>>>
>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>>>         at
>>>
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
>>>         at
>>>
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
>>>         at
>>>
>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
>>>         at
>>>
>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
>>>         at
>>>
>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>>>         at
>>>
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>>>         at
>>>
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>>>         at
>>>
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>>>         at
>>>
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
>>> Caused by: java.lang.UnsupportedOperationException
>>>         at
>>> java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
>>>         at
>>>
>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
>>>         at
>>>
>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>         at
>>>
>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>>         ... 26 more
>>>
>>>
>>> Not solving with:
>>> env.getConfig().disableForceKryo();
>>> env.getConfig().enableForceAvro();
>>>
>>>
>>> Any idea?
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>>

Reply via email to