I tried this:

1. Schema (found in stackoverflow)

class GenericRecordSchema implements
KafkaDeserializationSchema<GenericRecord> {

    private String registryUrl;
    private transient KafkaAvroDeserializer deserializer;

    public GenericRecordSchema(String registryUrl) {
        this.registryUrl = registryUrl;
    }

    @Override
    public boolean isEndOfStream(GenericRecord nextElement) {
        return false;
    }

    @Override
    public GenericRecord deserialize(ConsumerRecord<byte[], byte[]>
consumerRecord) throws Exception {
        checkInitialized();
        return (GenericRecord)
deserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
    }

    @Override
    public TypeInformation<GenericRecord> getProducedType() {
        return TypeExtractor.getForClass(GenericRecord.class);
    }

    private void checkInitialized() {
        if (deserializer == null) {
            Map<String, Object> props = new HashMap<>();
           
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
registryUrl);
           
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
            SchemaRegistryClient client =
                    new CachedSchemaRegistryClient(
                            registryUrl,
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
            deserializer = new KafkaAvroDeserializer(client, props);
        }
    }
}

2. Consumer

private static FlinkKafkaConsumer<GenericRecord> getConsumer(String topic) {

    return new FlinkKafkaConsumer<>(
            topic,
            new GenericRecordSchema("http://xxx.xx.xxx.xx:8081";),
            getConsumerProperties());
}

But when I start the app, the following error is happen:

com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
        at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
        at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
        at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
        at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
        at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
        at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
        at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
        at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
        at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
        at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
Caused by: java.lang.UnsupportedOperationException
        at 
java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
        at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
        at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        ... 26 more


Not solving with:
env.getConfig().disableForceKryo();
env.getConfig().enableForceAvro();


Any idea?

Thanks




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to