The complete solution for the record ( that others can benefit from it). KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder() .setProperties(kafkaProps) .setProperty("ssl.truststore.type",trustStoreType) .setProperty("ssl.truststore.password",trustStorePassword) .setProperty("ssl.truststore.location",trustStoreLocation) .setProperty("security.protocol",securityProtocol) .setProperty("partition.discovery.interval.ms", partitionDiscoveryIntervalMs) .setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint) .setGroupId(groupId) .setTopics(kafkaInputTopic) .setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(fetchMetadata))) .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) .build();
Op wo 9 feb. 2022 om 09:46 schreef HG <hanspeter.sl...@gmail.com>: > Sorry to have bothered everyone. > > This is the obvious solution: > > .setDeserializer(KafkaRecordDeserializationSchema.of(new > JSONKeyValueDeserializationSchema(false))) > > > Regards Hans-Peter > > > Op di 8 feb. 2022 om 21:56 schreef Roman Khachatryan <ro...@apache.org>: > >> Hi, >> >> setDeserializer() expects KafkaRecordDeserializationSchema; >> JSONKeyValueDeserializationSchema you provided is not compatible with >> it. >> You can convert it using [1] >> >> [1] >> >> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.html#of-org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema- >> >> >> Regards, >> Roman >> >> On Tue, Feb 8, 2022 at 5:43 PM HG <hanspeter.sl...@gmail.com> wrote: >> > >> > Hi all, >> > >> > When I build this code: >> > >> > KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder() >> > .setProperties(kafkaProps) >> > .setProperty("ssl.truststore.type",trustStoreType) >> > .setProperty("ssl.truststore.password",trustStorePassword) >> > .setProperty("ssl.truststore.location",trustStoreLocation) >> > .setProperty("security.protocol",securityProtocol) >> > .setProperty("partition.discovery.interval.ms", >> partitionDiscoveryIntervalMs) >> > .setProperty("commit.offsets.on.checkpoint", >> commitOffsetsOnCheckpoint) >> > .setGroupId(groupId) >> > .setTopics(kafkaInputTopic) >> > .setDeserializer(new JSONKeyValueDeserializationSchema(false)) >> > >> >> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) >> > .build(); >> > >> > >> > I get: >> > This error: >> > >> > error: incompatible types: >> org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema >> cannot be converted to >> org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema<org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode> >> > .setDeserializer(new >> JSONKeyValueDeserializationSchema(false)) >> > >> > >> > What am I doing wrong? >> > As per the documentation JSONKeyValueDeserializationSchema returns an >> ObjectNode. >> > >> > Regards Hans >> > >> >