Hello, Sorry for the late reply. I have checked the issue and it seems to be a type issue as the exception suggests. What happens is that the JSONKeyValueDeserializationSchema included in flink implements a KafkaDeserializationSchema. The .setDeserializer method expects a Deserialization schema though. The JSONKeyValueDeserializationSchema might have been left from an older flink version.
My recommendation would be to implement your own JSONKeyValueDeserializationSchema that implements a Deserialization schema. You even should be able to copy the implementation from the flink included JSONKeyValueDeserializationSchema and change KafkaDeserializationSchema to DeserializationSchema. If you will have issues with implementing this, please let me know and I will provide you with the code. Best regards Kamil On Mon, 7 Feb 2022, 15:15 HG, <hanspeter.sl...@gmail.com> wrote: > Hello Kamil et all, > > When I build this code: > > KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder() > .setProperties(kafkaProps) > .setProperty("ssl.truststore.type",trustStoreType) > .setProperty("ssl.truststore.password",trustStorePassword) > .setProperty("ssl.truststore.location",trustStoreLocation) > .setProperty("security.protocol",securityProtocol) > .setProperty("partition.discovery.interval.ms", > partitionDiscoveryIntervalMs) > .setProperty("commit.offsets.on.checkpoint", > commitOffsetsOnCheckpoint) > .setGroupId(groupId) > .setTopics(kafkaInputTopic) > .setDeserializer(new JSONKeyValueDeserializationSchema(false)) > > .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) > .build(); > > > I get: > This error: > > error: incompatible types: > org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema > cannot be converted to > org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema<org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode> > .setDeserializer(new > JSONKeyValueDeserializationSchema(false)) > > > What am I doing wrong? > As per the documentation JSONKeyValueDeserializationSchema returns an > ObjectNode. > > Regards Hans-Peter > > > > Op vr 14 jan. 2022 om 20:32 schreef Kamil ty <kamilt...@gmail.com>: > >> Hello Hans, >> >> As far as I know the JSONKeyValueDeserializationSchema returns a Jackson >> ObjectNode. Below I have included an example based on Flink stable >> documentation. >> >> KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder() >> .setBootstrapServers(brokers) >> .setTopics("input-topic") >> .setGroupId("my-group") >> .setStartingOffsets(OffsetsInitializer.earliest()) >> .setDeserializer(new JSONKeyValueDeserializationSchema(false)) >> .build(); >> >> DataStream<ObjectNode> ds = env.fromSource(source, >> WatermarkStrategy.noWatermarks(), "Kafka Source"); >> // Below we access the JSON field stored in the >> ObjectNode. >> DataStream<String> processedDs = ds.map(record -> >> record.get("value").get("my-field").asText()); >> >> It is also possible to implement your own deserialization schema that for >> eg. could turn each record into a POJO. You can do this by implementing the >> KafkaDeserializationSchema >> (Flink : 1.14-SNAPSHOT API) (apache.org) >> <https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/connectors/kafka/KafkaDeserializationSchema.html>. >> If you are only interested in the value of the Kafka record, you can also >> extend the AbstractDeserializationSchema (Flink : 1.14-SNAPSHOT API) >> (apache.org) >> <https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.html> >> and >> use .setValueOnlyDeserializer(new CustomDeserializer()). There is also a >> different API that you could use for this which is specified here: >> KafkaSourceBuilder >> (Flink : 1.14-SNAPSHOT API) (apache.org) >> <https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html>. >> Although the customDeserializer will be the same for older Flink versions, >> the Kafka Source has appeared recently, to learn about the previous kafka >> source (FlinkKafkaConsumer) see: Kafka | Apache Flink >> <https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-sourcefunction> >> . >> >> Best Regards >> Kamil >> >> On Fri, 14 Jan 2022 at 18:37, HG <hanspeter.sl...@gmail.com> wrote: >> >>> Hi, >>> >>> Before starting programming myself I'd like to know whether there are >>> good examples with deserialization of JSON that I can borrow. >>> The structure of the JSON is nested with multiple levels. >>> >>> Any references? >>> >>> 'better well stolen than badly invented myself' we'd say in Dutch😁 >>> >>> Regards Hans >>> >>