Hi Kamil, Aeden and others It was already answered This was the complete solution:
KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder() .setProperties(kafkaProps) .setProperty("ssl.truststore.type",trustStoreType) .setProperty("ssl.truststore.password",trustStorePassword) .setProperty("ssl.truststore.location",trustStoreLocation) .setProperty("security.protocol",securityProtocol) .setProperty("partition.discovery.interval.ms", partitionDiscoveryIntervalMs) .setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint) .setGroupId(inputGroupId) .setTopics(kafkaInputTopic) .setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(fetchMetadata))) .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) .build(); Thank a lot anyway. Op do 3 mrt. 2022 om 20:46 schreef Aeden Jameson <aeden.jame...@gmail.com>: > I believe you can solve this iss with, > > .setDeserializer(KafkaRecordDeserializationSchema.of(new > JSONKeyValueDeserializationSchema(false))) > > > On Thu, Mar 3, 2022 at 8:07 AM Kamil ty <kamilt...@gmail.com> wrote: > > > > Hello, > > > > Sorry for the late reply. I have checked the issue and it seems to be a > type issue as the exception suggests. What happens is that the > JSONKeyValueDeserializationSchema included in flink implements a > KafkaDeserializationSchema. The .setDeserializer method expects a > Deserialization schema though. The JSONKeyValueDeserializationSchema might > have been left from an older flink version. > > > > My recommendation would be to implement your own > JSONKeyValueDeserializationSchema that implements a Deserialization schema. > You even should be able to copy the implementation from the flink included > JSONKeyValueDeserializationSchema and change KafkaDeserializationSchema to > DeserializationSchema. > > > > If you will have issues with implementing this, please let me know and I > will provide you with the code. > > > > Best regards > > Kamil > > > > On Mon, 7 Feb 2022, 15:15 HG, <hanspeter.sl...@gmail.com> wrote: > >> > >> Hello Kamil et all, > >> > >> When I build this code: > >> > >> KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder() > >> .setProperties(kafkaProps) > >> .setProperty("ssl.truststore.type",trustStoreType) > >> .setProperty("ssl.truststore.password",trustStorePassword) > >> .setProperty("ssl.truststore.location",trustStoreLocation) > >> .setProperty("security.protocol",securityProtocol) > >> .setProperty("partition.discovery.interval.ms", > partitionDiscoveryIntervalMs) > >> .setProperty("commit.offsets.on.checkpoint", > commitOffsetsOnCheckpoint) > >> .setGroupId(groupId) > >> .setTopics(kafkaInputTopic) > >> .setDeserializer(new JSONKeyValueDeserializationSchema(false)) > >> > > .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) > >> .build(); > >> > >> > >> I get: > >> This error: > >> > >> error: incompatible types: > org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema > cannot be converted to > org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema<org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode> > >> .setDeserializer(new > JSONKeyValueDeserializationSchema(false)) > >> > >> > >> What am I doing wrong? > >> As per the documentation JSONKeyValueDeserializationSchema returns an > ObjectNode. > >> > >> Regards Hans-Peter > >> > >> > >> > >> Op vr 14 jan. 2022 om 20:32 schreef Kamil ty <kamilt...@gmail.com>: > >>> > >>> Hello Hans, > >>> > >>> As far as I know the JSONKeyValueDeserializationSchema returns a > Jackson ObjectNode. Below I have included an example based on Flink stable > documentation. > >>> > >>> KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder() > >>> .setBootstrapServers(brokers) > >>> .setTopics("input-topic") > >>> .setGroupId("my-group") > >>> .setStartingOffsets(OffsetsInitializer.earliest()) > >>> .setDeserializer(new JSONKeyValueDeserializationSchema(false)) > >>> .build(); > >>> > >>> DataStream<ObjectNode> ds = env.fromSource(source, > WatermarkStrategy.noWatermarks(), "Kafka Source"); > >>> // Below we access the JSON field stored in the > ObjectNode. > >>> DataStream<String> processedDs = ds.map(record -> > record.get("value").get("my-field").asText()); > >>> > >>> It is also possible to implement your own deserialization schema that > for eg. could turn each record into a POJO. You can do this by implementing > the KafkaDeserializationSchema (Flink : 1.14-SNAPSHOT API) (apache.org). > If you are only interested in the value of the Kafka record, you can also > extend the AbstractDeserializationSchema (Flink : 1.14-SNAPSHOT API) ( > apache.org) and use .setValueOnlyDeserializer(new CustomDeserializer()). > There is also a different API that you could use for this which is > specified here: KafkaSourceBuilder (Flink : 1.14-SNAPSHOT API) (apache.org). > Although the customDeserializer will be the same for older Flink versions, > the Kafka Source has appeared recently, to learn about the previous kafka > source (FlinkKafkaConsumer) see: Kafka | Apache Flink. > >>> > >>> Best Regards > >>> Kamil > >>> > >>> On Fri, 14 Jan 2022 at 18:37, HG <hanspeter.sl...@gmail.com> wrote: > >>>> > >>>> Hi, > >>>> > >>>> Before starting programming myself I'd like to know whether there are > good examples with deserialization of JSON that I can borrow. > >>>> The structure of the JSON is nested with multiple levels. > >>>> > >>>> Any references? > >>>> > >>>> 'better well stolen than badly invented myself' we'd say in Dutch😁 > >>>> > >>>> Regards Hans > > > > -- > Cheers, > Aeden > > GitHub: https://github.com/aedenj > Linked In: http://www.linkedin.com/in/aedenjameson > Blah Blah Blah: http://www.twitter.com/daliful >