Hi Kamil, Aeden and others
It was already answered

This was the complete solution:

KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
        .setProperties(kafkaProps)
        .setProperty("ssl.truststore.type",trustStoreType)
        .setProperty("ssl.truststore.password",trustStorePassword)
        .setProperty("ssl.truststore.location",trustStoreLocation)
        .setProperty("security.protocol",securityProtocol)
        .setProperty("partition.discovery.interval.ms",
partitionDiscoveryIntervalMs)
        .setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint)
        .setGroupId(inputGroupId)
        .setTopics(kafkaInputTopic)
        .setDeserializer(KafkaRecordDeserializationSchema.of(new
JSONKeyValueDeserializationSchema(fetchMetadata)))
        
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
        .build();


Thank a lot anyway.


Op do 3 mrt. 2022 om 20:46 schreef Aeden Jameson <aeden.jame...@gmail.com>:

> I believe you can solve this iss with,
>
> .setDeserializer(KafkaRecordDeserializationSchema.of(new
> JSONKeyValueDeserializationSchema(false)))
>
>
> On Thu, Mar 3, 2022 at 8:07 AM Kamil ty <kamilt...@gmail.com> wrote:
> >
> > Hello,
> >
> > Sorry for the late reply. I have checked the issue and it seems to be a
> type issue as the exception suggests. What happens is that the
> JSONKeyValueDeserializationSchema included in flink implements a
> KafkaDeserializationSchema. The .setDeserializer method expects a
> Deserialization schema though. The JSONKeyValueDeserializationSchema might
> have been left from an older flink version.
> >
> > My recommendation would be to implement your own
> JSONKeyValueDeserializationSchema that implements a Deserialization schema.
> You even should be able to copy the implementation from the flink included
> JSONKeyValueDeserializationSchema and change KafkaDeserializationSchema to
> DeserializationSchema.
> >
> > If you will have issues with implementing this, please let me know and I
> will provide you with the code.
> >
> > Best regards
> > Kamil
> >
> > On Mon, 7 Feb 2022, 15:15 HG, <hanspeter.sl...@gmail.com> wrote:
> >>
> >> Hello Kamil et all,
> >>
> >> When I build this code:
> >>
> >> KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
> >>         .setProperties(kafkaProps)
> >>         .setProperty("ssl.truststore.type",trustStoreType)
> >>         .setProperty("ssl.truststore.password",trustStorePassword)
> >>         .setProperty("ssl.truststore.location",trustStoreLocation)
> >>         .setProperty("security.protocol",securityProtocol)
> >>         .setProperty("partition.discovery.interval.ms",
> partitionDiscoveryIntervalMs)
> >>         .setProperty("commit.offsets.on.checkpoint",
> commitOffsetsOnCheckpoint)
> >>         .setGroupId(groupId)
> >>         .setTopics(kafkaInputTopic)
> >>         .setDeserializer(new JSONKeyValueDeserializationSchema(false))
> >>
>  
> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
> >>         .build();
> >>
> >>
> >> I get:
> >> This error:
> >>
> >> error: incompatible types:
> org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema
> cannot be converted to
> org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema<org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode>
> >>                 .setDeserializer(new
> JSONKeyValueDeserializationSchema(false))
> >>
> >>
> >> What am I doing wrong?
> >> As per the documentation JSONKeyValueDeserializationSchema returns an
> ObjectNode.
> >>
> >> Regards Hans-Peter
> >>
> >>
> >>
> >> Op vr 14 jan. 2022 om 20:32 schreef Kamil ty <kamilt...@gmail.com>:
> >>>
> >>> Hello Hans,
> >>>
> >>> As far as I know the JSONKeyValueDeserializationSchema returns a
> Jackson ObjectNode. Below I have included an example based on Flink stable
> documentation.
> >>>
> >>> KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
> >>> .setBootstrapServers(brokers)
> >>> .setTopics("input-topic")
> >>> .setGroupId("my-group")
> >>> .setStartingOffsets(OffsetsInitializer.earliest())
> >>> .setDeserializer(new JSONKeyValueDeserializationSchema(false))
> >>> .build();
> >>>
> >>> DataStream<ObjectNode> ds = env.fromSource(source,
> WatermarkStrategy.noWatermarks(), "Kafka Source");
> >>>                 // Below we access the JSON field stored in the
> ObjectNode.
> >>> DataStream<String> processedDs = ds.map(record ->
> record.get("value").get("my-field").asText());
> >>>
> >>> It is also possible to implement your own deserialization schema that
> for eg. could turn each record into a POJO. You can do this by implementing
> the KafkaDeserializationSchema (Flink : 1.14-SNAPSHOT API) (apache.org).
> If you are only interested in the value of the Kafka record, you can also
> extend the AbstractDeserializationSchema (Flink : 1.14-SNAPSHOT API) (
> apache.org) and use .setValueOnlyDeserializer(new CustomDeserializer()).
> There is also a different API that you could use for this which is
> specified here: KafkaSourceBuilder (Flink : 1.14-SNAPSHOT API) (apache.org).
> Although the customDeserializer will be the same for older Flink versions,
> the Kafka Source has appeared recently, to learn about the previous kafka
> source (FlinkKafkaConsumer) see: Kafka | Apache Flink.
> >>>
> >>> Best Regards
> >>> Kamil
> >>>
> >>> On Fri, 14 Jan 2022 at 18:37, HG <hanspeter.sl...@gmail.com> wrote:
> >>>>
> >>>> Hi,
> >>>>
> >>>> Before starting programming myself I'd like to know whether there are
> good examples with deserialization of JSON that I can borrow.
> >>>> The structure of the JSON is nested with multiple levels.
> >>>>
> >>>> Any references?
> >>>>
> >>>> 'better well stolen than badly invented myself' we'd say in Dutch😁
> >>>>
> >>>> Regards Hans
>
>
>
> --
> Cheers,
> Aeden
>
> GitHub: https://github.com/aedenj
> Linked In: http://www.linkedin.com/in/aedenjameson
> Blah Blah Blah: http://www.twitter.com/daliful
>

Reply via email to