Arian gave good pointers, but I'd go even further: you should have ITCases where you pretty much just execute a mini job with docker-based Kafka and run it automatically. I strongly recommend to check out testcontainers [1], it makes writing such a test a really smooth experience.
[1] https://www.testcontainers.org/modules/kafka/ On Wed, Mar 31, 2021 at 2:29 PM Arian Rohani <arianroh...@gmail.com> wrote: > The issue at hand is that the record contains an unmodifiable collection > which the kryo serialiser attempts to modify by first initialising the > object and then adding items to the collection (iirc). > > Caused by: java.lang.UnsupportedOperationException >> at >> java.util.Collections$UnmodifiableCollection.add(Collections.java:1057) > > > Without knowing the specifics of what it is exactly you are trying to > deserialise I can only attempt to give a generic answer which is to try > something like: > > >> StreamExecutionEnvironment see = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> Class<?> unmodColl = >> Class.forName("java.util.Collections$UnmodifiableCollection"); >> see.getConfig().addDefaultKryoSerializer(unmodColl, >> UnmodifiableCollectionsSerializer.class); > > > An even better approach is to set-up a local sandbox environment in docker > with Kafka and a sink of your choice and simply running the application > form the main method in debug mode and setting a breakpoint right before it > throws the exception. > > Kind regards, > Arian Rohani > > > Den ons 31 mars 2021 kl 13:27 skrev Matthias Pohl <matth...@ververica.com > >: > >> Hi Maminspapin, >> I haven't worked with Kafka/Flink, yet. But have you had a look at the >> docs about the DeserializationSchema [1]? It >> mentions ConfluentRegistryAvroDeserializationSchema. Is this something >> you're looking for? >> >> Best, >> Matthias >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema >> >> On Tue, Mar 30, 2021 at 6:55 AM Maminspapin <un...@mail.ru> wrote: >> >>> I tried this: >>> >>> 1. Schema (found in stackoverflow) >>> >>> class GenericRecordSchema implements >>> KafkaDeserializationSchema<GenericRecord> { >>> >>> private String registryUrl; >>> private transient KafkaAvroDeserializer deserializer; >>> >>> public GenericRecordSchema(String registryUrl) { >>> this.registryUrl = registryUrl; >>> } >>> >>> @Override >>> public boolean isEndOfStream(GenericRecord nextElement) { >>> return false; >>> } >>> >>> @Override >>> public GenericRecord deserialize(ConsumerRecord<byte[], byte[]> >>> consumerRecord) throws Exception { >>> checkInitialized(); >>> return (GenericRecord) >>> deserializer.deserialize(consumerRecord.topic(), consumerRecord.value()); >>> } >>> >>> @Override >>> public TypeInformation<GenericRecord> getProducedType() { >>> return TypeExtractor.getForClass(GenericRecord.class); >>> } >>> >>> private void checkInitialized() { >>> if (deserializer == null) { >>> Map<String, Object> props = new HashMap<>(); >>> >>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, >>> registryUrl); >>> >>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, >>> false); >>> SchemaRegistryClient client = >>> new CachedSchemaRegistryClient( >>> registryUrl, >>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT); >>> deserializer = new KafkaAvroDeserializer(client, props); >>> } >>> } >>> } >>> >>> 2. Consumer >>> >>> private static FlinkKafkaConsumer<GenericRecord> getConsumer(String >>> topic) { >>> >>> return new FlinkKafkaConsumer<>( >>> topic, >>> new GenericRecordSchema("http://xxx.xx.xxx.xx:8081"), >>> getConsumerProperties()); >>> } >>> >>> But when I start the app, the following error is happen: >>> >>> com.esotericsoftware.kryo.KryoException: >>> java.lang.UnsupportedOperationException >>> Serialization trace: >>> reserved (org.apache.avro.Schema$Field) >>> fieldMap (org.apache.avro.Schema$RecordSchema) >>> schema (org.apache.avro.generic.GenericData$Record) >>> at >>> >>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) >>> at >>> >>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >>> at >>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >>> at >>> >>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) >>> at >>> >>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) >>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) >>> at >>> >>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) >>> at >>> >>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) >>> at >>> >>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) >>> at >>> >>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) >>> at >>> >>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273) >>> at >>> >>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69) >>> at >>> >>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) >>> at >>> >>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) >>> at >>> >>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) >>> at >>> >>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) >>> at >>> >>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322) >>> at >>> >>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426) >>> at >>> >>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365) >>> at >>> >>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183) >>> at >>> >>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) >>> at >>> >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) >>> at >>> >>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) >>> at >>> >>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) >>> at >>> >>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241) >>> Caused by: java.lang.UnsupportedOperationException >>> at >>> java.util.Collections$UnmodifiableCollection.add(Collections.java:1057) >>> at >>> >>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) >>> at >>> >>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) >>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) >>> at >>> >>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) >>> ... 26 more >>> >>> >>> Not solving with: >>> env.getConfig().disableForceKryo(); >>> env.getConfig().enableForceAvro(); >>> >>> >>> Any idea? >>> >>> Thanks >>> >>> >>> >>> >>> -- >>> Sent from: >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >> >>