Hi, 

Your usage looks good to me, but could you provide the exception (if any) or 
the unexpected behavior you met after starting the job? It’s difficult to debug 
with only these configurations. 

Best regards, 

Qingsheng

> On Mar 18, 2022, at 01:04, HG <hanspeter.sl...@gmail.com> wrote:
> 
> Hi Matthias,
> 
> It should be probably be like this:
> 
> Properties SinkkafkaProps  = new Properties();
> SinkkafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> outputBrokers);
> SinkkafkaProps.setProperty("ssl.truststore.type", trustStoreType);
> SinkkafkaProps.setProperty("ssl.truststore.location", trustStoreLocation);
> SinkkafkaProps.setProperty("partition.discovery.interval.ms", 
> partitionDiscoveryIntervalMs);
> SinkkafkaProps.setProperty("commit.offsets.on.checkpoint", 
> commitOffsetsOnCheckpoint);
> 
> 
> KafkaSink<String> kSink = KafkaSink.<String>builder()
>         .setBootstrapServers(outputBrokers)
>         .setKafkaProducerConfig(SinkkafkaProps)
>         .setRecordSerializer(KafkaRecordSerializationSchema.builder()
>                 .setTopic(kafkaOutputTopic)
>                 .setValueSerializationSchema(new SimpleStringSchema())
>                 .build()
>         )
>         .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>         .build();
> 
> Op do 17 mrt. 2022 om 17:29 schreef Matthias Pohl <matth...@ververica.com>:
> Could you share more details on what's not working? Is the 
> ssl.trustore.location accessible from the Flink nodes?
> 
> Matthias
> 
> On Thu, Mar 17, 2022 at 4:00 PM HG <hanspeter.sl...@gmail.com> wrote:
> Hi all,
> I am probably not the smartest but I cannot find how to set ssl-properties 
> for a Kafka Sink. 
> My assumption was that it would be just like the Kafka Consumer
> 
> KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
>         .setProperties(kafkaProps)
>         .setProperty("ssl.truststore.type", trustStoreType)
>         .setProperty("ssl.truststore.password", trustStorePassword)
>         .setProperty("ssl.truststore.location", trustStoreLocation)
>         .setProperty("security.protocol", securityProtocol)
>         .setProperty("partition.discovery.interval.ms", 
> partitionDiscoveryIntervalMs)
>         .setProperty("commit.offsets.on.checkpoint", 
> commitOffsetsOnCheckpoint)
>         .setGroupId(inputGroupId)
>         .setClientIdPrefix(clientId)
>         .setTopics(kafkaInputTopic)
>         .setDeserializer(KafkaRecordDeserializationSchema.of(new 
> JSONKeyValueDeserializationSchema(fetchMetadata)))
>         
> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>         .build();
> 
> But that seems not to be the case.
> 
> Any quick pointers?
> 
> Regards Hans-Peter

Reply via email to