Re: how to set kafka sink ssl properties

2022-03-22 Thread HG
Hello Matthias and others

I am trying to configure a Kafka Sink with SSL properties as shown further
below.

But in the logs I see warnings:

2022-03-21 12:30:17,108 WARN
 org.apache.kafka.clients.admin.AdminClientConfig [] - The
configuration 'group.id' was supplied but isn't a known config.
2022-03-21 12:30:17,109 WARN
 org.apache.kafka.clients.admin.AdminClientConfig [] - The
configuration 'partition.discovery.interval.ms' was supplied but isn't a
known config.
2022-03-21 12:30:17,109 WARN
 org.apache.kafka.clients.admin.AdminClientConfig [] - The
configuration 'auto.commit.interval.ms' was supplied but isn't a known
config.
2022-03-21 12:30:17,109 WARN
 org.apache.kafka.clients.admin.AdminClientConfig [] - The
configuration 'ssl.truststore.type' was supplied but isn't a known config.
2022-03-21 12:30:17,111 WARN
 org.apache.kafka.clients.admin.AdminClientConfig [] - The
configuration 'ssl.truststore.location' was supplied but isn't a known
config.
2022-03-21 12:30:17,115 WARN
 org.apache.kafka.clients.admin.AdminClientConfig [] - The
configuration 'ssl.truststore.password' was supplied but isn't a known
config.
2022-03-21 12:30:17,115 WARN
 org.apache.kafka.clients.admin.AdminClientConfig [] - The
configuration 'auto.offset.reset' was supplied but isn't a known config.

It seems that they are bogus.

Regards Hans-Peter

Properties sinkkafkaProps  = new Properties();
sinkkafkaProps.setProperty("ssl.truststore.type", outputTrustStoreType);
sinkkafkaProps.setProperty("ssl.truststore.location", outputTrustStoreLocation);
sinkkafkaProps.setProperty("ssl.truststore.password", outputTrustStorePassword);
sinkkafkaProps.setProperty("security.protocol", outputSecurityProtocol);
sinkkafkaProps.setProperty("max.request.size", maxRequestSize);
sinkkafkaProps.setProperty("partition.discovery.interval.ms",
partitionDiscoveryIntervalMs);
sinkkafkaProps.setProperty("commit.offsets.on.checkpoint",
commitOffsetsOnCheckpoint);

if (kafkaOutputDisabled.equals("false")) {
KafkaSink kSink = KafkaSink.builder()
.setBootstrapServers(outputBrokers)
.setKafkaProducerConfig(sinkkafkaProps)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(kafkaOutputTopic)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();



Op do 17 mrt. 2022 om 17:29 schreef Matthias Pohl :

> Could you share more details on what's not working? Is the
> ssl.trustore.location accessible from the Flink nodes?
>
> Matthias
>
> On Thu, Mar 17, 2022 at 4:00 PM HG  wrote:
>
>> Hi all,
>> I am probably not the smartest but I cannot find how to set
>> ssl-properties for a Kafka Sink.
>> My assumption was that it would be just like the Kafka Consumer
>>
>> KafkaSource source = KafkaSource.builder()
>> .setProperties(kafkaProps)
>> .setProperty("ssl.truststore.type", trustStoreType)
>> .setProperty("ssl.truststore.password", trustStorePassword)
>> .setProperty("ssl.truststore.location", trustStoreLocation)
>> .setProperty("security.protocol", securityProtocol)
>> .setProperty("partition.discovery.interval.ms", 
>> partitionDiscoveryIntervalMs)
>> .setProperty("commit.offsets.on.checkpoint", 
>> commitOffsetsOnCheckpoint)
>> .setGroupId(inputGroupId)
>> .setClientIdPrefix(clientId)
>> .setTopics(kafkaInputTopic)
>> .setDeserializer(KafkaRecordDeserializationSchema.of(new 
>> JSONKeyValueDeserializationSchema(fetchMetadata)))
>> 
>> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>> .build();
>>
>>
>> But that seems not to be the case.
>>
>> Any quick pointers?
>>
>> Regards Hans-Peter
>>
>


Re: how to set kafka sink ssl properties

2022-03-18 Thread Qingsheng Ren
Hi, 

Your usage looks good to me, but could you provide the exception (if any) or 
the unexpected behavior you met after starting the job? It’s difficult to debug 
with only these configurations. 

Best regards, 

Qingsheng

> On Mar 18, 2022, at 01:04, HG  wrote:
> 
> Hi Matthias,
> 
> It should be probably be like this:
> 
> Properties SinkkafkaProps  = new Properties();
> SinkkafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> outputBrokers);
> SinkkafkaProps.setProperty("ssl.truststore.type", trustStoreType);
> SinkkafkaProps.setProperty("ssl.truststore.location", trustStoreLocation);
> SinkkafkaProps.setProperty("partition.discovery.interval.ms", 
> partitionDiscoveryIntervalMs);
> SinkkafkaProps.setProperty("commit.offsets.on.checkpoint", 
> commitOffsetsOnCheckpoint);
> 
> 
> KafkaSink kSink = KafkaSink.builder()
> .setBootstrapServers(outputBrokers)
> .setKafkaProducerConfig(SinkkafkaProps)
> .setRecordSerializer(KafkaRecordSerializationSchema.builder()
> .setTopic(kafkaOutputTopic)
> .setValueSerializationSchema(new SimpleStringSchema())
> .build()
> )
> .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
> .build();
> 
> Op do 17 mrt. 2022 om 17:29 schreef Matthias Pohl :
> Could you share more details on what's not working? Is the 
> ssl.trustore.location accessible from the Flink nodes?
> 
> Matthias
> 
> On Thu, Mar 17, 2022 at 4:00 PM HG  wrote:
> Hi all,
> I am probably not the smartest but I cannot find how to set ssl-properties 
> for a Kafka Sink. 
> My assumption was that it would be just like the Kafka Consumer
> 
> KafkaSource source = KafkaSource.builder()
> .setProperties(kafkaProps)
> .setProperty("ssl.truststore.type", trustStoreType)
> .setProperty("ssl.truststore.password", trustStorePassword)
> .setProperty("ssl.truststore.location", trustStoreLocation)
> .setProperty("security.protocol", securityProtocol)
> .setProperty("partition.discovery.interval.ms", 
> partitionDiscoveryIntervalMs)
> .setProperty("commit.offsets.on.checkpoint", 
> commitOffsetsOnCheckpoint)
> .setGroupId(inputGroupId)
> .setClientIdPrefix(clientId)
> .setTopics(kafkaInputTopic)
> .setDeserializer(KafkaRecordDeserializationSchema.of(new 
> JSONKeyValueDeserializationSchema(fetchMetadata)))
> 
> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
> .build();
> 
> But that seems not to be the case.
> 
> Any quick pointers?
> 
> Regards Hans-Peter



Re: how to set kafka sink ssl properties

2022-03-17 Thread HG
Hi Matthias,

It should be probably be like this:

Properties SinkkafkaProps  = new Properties();
SinkkafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
outputBrokers);

SinkkafkaProps.setProperty("ssl.truststore.type", trustStoreType);
SinkkafkaProps.setProperty("ssl.truststore.location", trustStoreLocation);
SinkkafkaProps.setProperty("partition.discovery.interval.ms",
partitionDiscoveryIntervalMs);
SinkkafkaProps.setProperty("commit.offsets.on.checkpoint",
commitOffsetsOnCheckpoint);


KafkaSink kSink = KafkaSink.builder()
.setBootstrapServers(outputBrokers)
.setKafkaProducerConfig(SinkkafkaProps)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(kafkaOutputTopic)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();


Op do 17 mrt. 2022 om 17:29 schreef Matthias Pohl :

> Could you share more details on what's not working? Is the
> ssl.trustore.location accessible from the Flink nodes?
>
> Matthias
>
> On Thu, Mar 17, 2022 at 4:00 PM HG  wrote:
>
>> Hi all,
>> I am probably not the smartest but I cannot find how to set
>> ssl-properties for a Kafka Sink.
>> My assumption was that it would be just like the Kafka Consumer
>>
>> KafkaSource source = KafkaSource.builder()
>> .setProperties(kafkaProps)
>> .setProperty("ssl.truststore.type", trustStoreType)
>> .setProperty("ssl.truststore.password", trustStorePassword)
>> .setProperty("ssl.truststore.location", trustStoreLocation)
>> .setProperty("security.protocol", securityProtocol)
>> .setProperty("partition.discovery.interval.ms", 
>> partitionDiscoveryIntervalMs)
>> .setProperty("commit.offsets.on.checkpoint", 
>> commitOffsetsOnCheckpoint)
>> .setGroupId(inputGroupId)
>> .setClientIdPrefix(clientId)
>> .setTopics(kafkaInputTopic)
>> .setDeserializer(KafkaRecordDeserializationSchema.of(new 
>> JSONKeyValueDeserializationSchema(fetchMetadata)))
>> 
>> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>> .build();
>>
>>
>> But that seems not to be the case.
>>
>> Any quick pointers?
>>
>> Regards Hans-Peter
>>
>


Re: how to set kafka sink ssl properties

2022-03-17 Thread Matthias Pohl
Could you share more details on what's not working? Is the
ssl.trustore.location accessible from the Flink nodes?

Matthias

On Thu, Mar 17, 2022 at 4:00 PM HG  wrote:

> Hi all,
> I am probably not the smartest but I cannot find how to set ssl-properties
> for a Kafka Sink.
> My assumption was that it would be just like the Kafka Consumer
>
> KafkaSource source = KafkaSource.builder()
> .setProperties(kafkaProps)
> .setProperty("ssl.truststore.type", trustStoreType)
> .setProperty("ssl.truststore.password", trustStorePassword)
> .setProperty("ssl.truststore.location", trustStoreLocation)
> .setProperty("security.protocol", securityProtocol)
> .setProperty("partition.discovery.interval.ms", 
> partitionDiscoveryIntervalMs)
> .setProperty("commit.offsets.on.checkpoint", 
> commitOffsetsOnCheckpoint)
> .setGroupId(inputGroupId)
> .setClientIdPrefix(clientId)
> .setTopics(kafkaInputTopic)
> .setDeserializer(KafkaRecordDeserializationSchema.of(new 
> JSONKeyValueDeserializationSchema(fetchMetadata)))
> 
> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
> .build();
>
>
> But that seems not to be the case.
>
> Any quick pointers?
>
> Regards Hans-Peter
>


how to set kafka sink ssl properties

2022-03-17 Thread HG
Hi all,
I am probably not the smartest but I cannot find how to set ssl-properties
for a Kafka Sink.
My assumption was that it would be just like the Kafka Consumer

KafkaSource source = KafkaSource.builder()
.setProperties(kafkaProps)
.setProperty("ssl.truststore.type", trustStoreType)
.setProperty("ssl.truststore.password", trustStorePassword)
.setProperty("ssl.truststore.location", trustStoreLocation)
.setProperty("security.protocol", securityProtocol)
.setProperty("partition.discovery.interval.ms",
partitionDiscoveryIntervalMs)
.setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint)
.setGroupId(inputGroupId)
.setClientIdPrefix(clientId)
.setTopics(kafkaInputTopic)
.setDeserializer(KafkaRecordDeserializationSchema.of(new
JSONKeyValueDeserializationSchema(fetchMetadata)))

.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.build();


But that seems not to be the case.

Any quick pointers?

Regards Hans-Peter