Hello Kamil et all, When I build this code:
KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder() .setProperties(kafkaProps) .setProperty("ssl.truststore.type",trustStoreType) .setProperty("ssl.truststore.password",trustStorePassword) .setProperty("ssl.truststore.location",trustStoreLocation) .setProperty("security.protocol",securityProtocol) .setProperty("partition.discovery.interval.ms", partitionDiscoveryIntervalMs) .setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint) .setGroupId(groupId) .setTopics(kafkaInputTopic) .setDeserializer(new JSONKeyValueDeserializationSchema(false)) .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) .build(); I get: This error: error: incompatible types: org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema cannot be converted to org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema<org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode> .setDeserializer(new JSONKeyValueDeserializationSchema(false)) What am I doing wrong? As per the documentation JSONKeyValueDeserializationSchema returns an ObjectNode. Regards Hans-Peter Op vr 14 jan. 2022 om 20:32 schreef Kamil ty <kamilt...@gmail.com>: > Hello Hans, > > As far as I know the JSONKeyValueDeserializationSchema returns a Jackson > ObjectNode. Below I have included an example based on Flink stable > documentation. > > KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder() > .setBootstrapServers(brokers) > .setTopics("input-topic") > .setGroupId("my-group") > .setStartingOffsets(OffsetsInitializer.earliest()) > .setDeserializer(new JSONKeyValueDeserializationSchema(false)) > .build(); > > DataStream<ObjectNode> ds = env.fromSource(source, > WatermarkStrategy.noWatermarks(), "Kafka Source"); > // Below we access the JSON field stored in the ObjectNode. > DataStream<String> processedDs = ds.map(record -> > record.get("value").get("my-field").asText()); > > It is also possible to implement your own deserialization schema that for > eg. could turn each record into a POJO. You can do this by implementing the > KafkaDeserializationSchema > (Flink : 1.14-SNAPSHOT API) (apache.org) > <https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/connectors/kafka/KafkaDeserializationSchema.html>. > If you are only interested in the value of the Kafka record, you can also > extend the AbstractDeserializationSchema (Flink : 1.14-SNAPSHOT API) > (apache.org) > <https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.html> > and > use .setValueOnlyDeserializer(new CustomDeserializer()). There is also a > different API that you could use for this which is specified here: > KafkaSourceBuilder > (Flink : 1.14-SNAPSHOT API) (apache.org) > <https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html>. > Although the customDeserializer will be the same for older Flink versions, > the Kafka Source has appeared recently, to learn about the previous kafka > source (FlinkKafkaConsumer) see: Kafka | Apache Flink > <https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-sourcefunction> > . > > Best Regards > Kamil > > On Fri, 14 Jan 2022 at 18:37, HG <hanspeter.sl...@gmail.com> wrote: > >> Hi, >> >> Before starting programming myself I'd like to know whether there are >> good examples with deserialization of JSON that I can borrow. >> The structure of the JSON is nested with multiple levels. >> >> Any references? >> >> 'better well stolen than badly invented myself' we'd say in Dutch😁 >> >> Regards Hans >> >