The complete solution for the record ( that others can benefit from it).

KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
        .setProperties(kafkaProps)
        .setProperty("ssl.truststore.type",trustStoreType)
        .setProperty("ssl.truststore.password",trustStorePassword)
        .setProperty("ssl.truststore.location",trustStoreLocation)
        .setProperty("security.protocol",securityProtocol)
        .setProperty("partition.discovery.interval.ms",
partitionDiscoveryIntervalMs)
        .setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint)
        .setGroupId(groupId)
        .setTopics(kafkaInputTopic)
        .setDeserializer(KafkaRecordDeserializationSchema.of(new
JSONKeyValueDeserializationSchema(fetchMetadata)))
        
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
        .build();


Op wo 9 feb. 2022 om 09:46 schreef HG <hanspeter.sl...@gmail.com>:

> Sorry to have bothered everyone.
>
> This is the obvious solution:
>
> .setDeserializer(KafkaRecordDeserializationSchema.of(new 
> JSONKeyValueDeserializationSchema(false)))
>
>
> Regards Hans-Peter
>
>
> Op di 8 feb. 2022 om 21:56 schreef Roman Khachatryan <ro...@apache.org>:
>
>> Hi,
>>
>> setDeserializer() expects KafkaRecordDeserializationSchema;
>> JSONKeyValueDeserializationSchema you provided is not compatible with
>> it.
>> You can convert it using [1]
>>
>> [1]
>>
>> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.html#of-org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema-
>>
>>
>> Regards,
>> Roman
>>
>> On Tue, Feb 8, 2022 at 5:43 PM HG <hanspeter.sl...@gmail.com> wrote:
>> >
>> > Hi all,
>> >
>> > When I build this code:
>> >
>> > KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
>> >         .setProperties(kafkaProps)
>> >         .setProperty("ssl.truststore.type",trustStoreType)
>> >         .setProperty("ssl.truststore.password",trustStorePassword)
>> >         .setProperty("ssl.truststore.location",trustStoreLocation)
>> >         .setProperty("security.protocol",securityProtocol)
>> >         .setProperty("partition.discovery.interval.ms",
>> partitionDiscoveryIntervalMs)
>> >         .setProperty("commit.offsets.on.checkpoint",
>> commitOffsetsOnCheckpoint)
>> >         .setGroupId(groupId)
>> >         .setTopics(kafkaInputTopic)
>> >         .setDeserializer(new JSONKeyValueDeserializationSchema(false))
>> >
>>  
>> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>> >         .build();
>> >
>> >
>> > I get:
>> > This error:
>> >
>> > error: incompatible types:
>> org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema
>> cannot be converted to
>> org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema<org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode>
>> >                 .setDeserializer(new
>> JSONKeyValueDeserializationSchema(false))
>> >
>> >
>> > What am I doing wrong?
>> > As per the documentation JSONKeyValueDeserializationSchema returns an
>> ObjectNode.
>> >
>> > Regards Hans
>> >
>>
>

Reply via email to