I tried this: 1. Schema (found in stackoverflow)
class GenericRecordSchema implements KafkaDeserializationSchema<GenericRecord> { private String registryUrl; private transient KafkaAvroDeserializer deserializer; public GenericRecordSchema(String registryUrl) { this.registryUrl = registryUrl; } @Override public boolean isEndOfStream(GenericRecord nextElement) { return false; } @Override public GenericRecord deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception { checkInitialized(); return (GenericRecord) deserializer.deserialize(consumerRecord.topic(), consumerRecord.value()); } @Override public TypeInformation<GenericRecord> getProducedType() { return TypeExtractor.getForClass(GenericRecord.class); } private void checkInitialized() { if (deserializer == null) { Map<String, Object> props = new HashMap<>(); props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl); props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false); SchemaRegistryClient client = new CachedSchemaRegistryClient( registryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT); deserializer = new KafkaAvroDeserializer(client, props); } } } 2. Consumer private static FlinkKafkaConsumer<GenericRecord> getConsumer(String topic) { return new FlinkKafkaConsumer<>( topic, new GenericRecordSchema("http://xxx.xx.xxx.xx:8081"), getConsumerProperties()); } But when I start the app, the following error is happen: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException Serialization trace: reserved (org.apache.avro.Schema$Field) fieldMap (org.apache.avro.Schema$RecordSchema) schema (org.apache.avro.generic.GenericData$Record) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241) Caused by: java.lang.UnsupportedOperationException at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ... 26 more Not solving with: env.getConfig().disableForceKryo(); env.getConfig().enableForceAvro(); Any idea? Thanks -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/