Hello Kamil et all,

When I build this code:

KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
        .setProperties(kafkaProps)
        .setProperty("ssl.truststore.type",trustStoreType)
        .setProperty("ssl.truststore.password",trustStorePassword)
        .setProperty("ssl.truststore.location",trustStoreLocation)
        .setProperty("security.protocol",securityProtocol)
        .setProperty("partition.discovery.interval.ms",
partitionDiscoveryIntervalMs)
        .setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint)
        .setGroupId(groupId)
        .setTopics(kafkaInputTopic)
        .setDeserializer(new JSONKeyValueDeserializationSchema(false))
        
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
        .build();


I get:
This error:

error: incompatible types:
org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema
cannot be converted to
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema<org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode>
                .setDeserializer(new
JSONKeyValueDeserializationSchema(false))


What am I doing wrong?
As per the documentation JSONKeyValueDeserializationSchema returns an
ObjectNode.

Regards Hans-Peter



Op vr 14 jan. 2022 om 20:32 schreef Kamil ty <kamilt...@gmail.com>:

> Hello Hans,
>
> As far as I know the JSONKeyValueDeserializationSchema returns a Jackson
> ObjectNode. Below I have included an example based on Flink stable
> documentation.
>
> KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
> .setBootstrapServers(brokers)
> .setTopics("input-topic")
> .setGroupId("my-group")
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setDeserializer(new JSONKeyValueDeserializationSchema(false))
> .build();
>
> DataStream<ObjectNode> ds = env.fromSource(source,
> WatermarkStrategy.noWatermarks(), "Kafka Source");
>                 // Below we access the JSON field stored in the ObjectNode.
> DataStream<String> processedDs = ds.map(record ->
> record.get("value").get("my-field").asText());
>
> It is also possible to implement your own deserialization schema that for
> eg. could turn each record into a POJO. You can do this by implementing the 
> KafkaDeserializationSchema
> (Flink : 1.14-SNAPSHOT API) (apache.org)
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/connectors/kafka/KafkaDeserializationSchema.html>.
> If you are only interested in the value of the Kafka record, you can also
> extend the AbstractDeserializationSchema (Flink : 1.14-SNAPSHOT API)
> (apache.org)
> <https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.html>
>  and
> use .setValueOnlyDeserializer(new CustomDeserializer()). There is also a
> different API that you could use for this which is specified here: 
> KafkaSourceBuilder
> (Flink : 1.14-SNAPSHOT API) (apache.org)
> <https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html>.
> Although the customDeserializer will be the same for older Flink versions,
> the Kafka Source has appeared recently, to learn about the previous kafka
> source (FlinkKafkaConsumer) see: Kafka | Apache Flink
> <https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-sourcefunction>
> .
>
> Best Regards
> Kamil
>
> On Fri, 14 Jan 2022 at 18:37, HG <hanspeter.sl...@gmail.com> wrote:
>
>> Hi,
>>
>> Before starting programming myself I'd like to know whether there are
>> good examples with deserialization of JSON that I can borrow.
>> The structure of the JSON is nested with multiple levels.
>>
>> Any references?
>>
>> 'better well stolen than badly invented myself' we'd say in Dutch😁
>>
>> Regards Hans
>>
>

Reply via email to