Re: how to set kafka sink ssl properties
Hello Matthias and others I am trying to configure a Kafka Sink with SSL properties as shown further below. But in the logs I see warnings: 2022-03-21 12:30:17,108 WARN org.apache.kafka.clients.admin.AdminClientConfig [] - The configuration 'group.id' was supplied but isn't a known config. 2022-03-21 12:30:17,109 WARN org.apache.kafka.clients.admin.AdminClientConfig [] - The configuration 'partition.discovery.interval.ms' was supplied but isn't a known config. 2022-03-21 12:30:17,109 WARN org.apache.kafka.clients.admin.AdminClientConfig [] - The configuration 'auto.commit.interval.ms' was supplied but isn't a known config. 2022-03-21 12:30:17,109 WARN org.apache.kafka.clients.admin.AdminClientConfig [] - The configuration 'ssl.truststore.type' was supplied but isn't a known config. 2022-03-21 12:30:17,111 WARN org.apache.kafka.clients.admin.AdminClientConfig [] - The configuration 'ssl.truststore.location' was supplied but isn't a known config. 2022-03-21 12:30:17,115 WARN org.apache.kafka.clients.admin.AdminClientConfig [] - The configuration 'ssl.truststore.password' was supplied but isn't a known config. 2022-03-21 12:30:17,115 WARN org.apache.kafka.clients.admin.AdminClientConfig [] - The configuration 'auto.offset.reset' was supplied but isn't a known config. It seems that they are bogus. Regards Hans-Peter Properties sinkkafkaProps = new Properties(); sinkkafkaProps.setProperty("ssl.truststore.type", outputTrustStoreType); sinkkafkaProps.setProperty("ssl.truststore.location", outputTrustStoreLocation); sinkkafkaProps.setProperty("ssl.truststore.password", outputTrustStorePassword); sinkkafkaProps.setProperty("security.protocol", outputSecurityProtocol); sinkkafkaProps.setProperty("max.request.size", maxRequestSize); sinkkafkaProps.setProperty("partition.discovery.interval.ms", partitionDiscoveryIntervalMs); sinkkafkaProps.setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint); if (kafkaOutputDisabled.equals("false")) { KafkaSink kSink = KafkaSink.builder() .setBootstrapServers(outputBrokers) .setKafkaProducerConfig(sinkkafkaProps) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic(kafkaOutputTopic) .setValueSerializationSchema(new SimpleStringSchema()) .build() ) .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build(); Op do 17 mrt. 2022 om 17:29 schreef Matthias Pohl : > Could you share more details on what's not working? Is the > ssl.trustore.location accessible from the Flink nodes? > > Matthias > > On Thu, Mar 17, 2022 at 4:00 PM HG wrote: > >> Hi all, >> I am probably not the smartest but I cannot find how to set >> ssl-properties for a Kafka Sink. >> My assumption was that it would be just like the Kafka Consumer >> >> KafkaSource source = KafkaSource.builder() >> .setProperties(kafkaProps) >> .setProperty("ssl.truststore.type", trustStoreType) >> .setProperty("ssl.truststore.password", trustStorePassword) >> .setProperty("ssl.truststore.location", trustStoreLocation) >> .setProperty("security.protocol", securityProtocol) >> .setProperty("partition.discovery.interval.ms", >> partitionDiscoveryIntervalMs) >> .setProperty("commit.offsets.on.checkpoint", >> commitOffsetsOnCheckpoint) >> .setGroupId(inputGroupId) >> .setClientIdPrefix(clientId) >> .setTopics(kafkaInputTopic) >> .setDeserializer(KafkaRecordDeserializationSchema.of(new >> JSONKeyValueDeserializationSchema(fetchMetadata))) >> >> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) >> .build(); >> >> >> But that seems not to be the case. >> >> Any quick pointers? >> >> Regards Hans-Peter >> >
Re: how to set kafka sink ssl properties
Hi, Your usage looks good to me, but could you provide the exception (if any) or the unexpected behavior you met after starting the job? It’s difficult to debug with only these configurations. Best regards, Qingsheng > On Mar 18, 2022, at 01:04, HG wrote: > > Hi Matthias, > > It should be probably be like this: > > Properties SinkkafkaProps = new Properties(); > SinkkafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > outputBrokers); > SinkkafkaProps.setProperty("ssl.truststore.type", trustStoreType); > SinkkafkaProps.setProperty("ssl.truststore.location", trustStoreLocation); > SinkkafkaProps.setProperty("partition.discovery.interval.ms", > partitionDiscoveryIntervalMs); > SinkkafkaProps.setProperty("commit.offsets.on.checkpoint", > commitOffsetsOnCheckpoint); > > > KafkaSink kSink = KafkaSink.builder() > .setBootstrapServers(outputBrokers) > .setKafkaProducerConfig(SinkkafkaProps) > .setRecordSerializer(KafkaRecordSerializationSchema.builder() > .setTopic(kafkaOutputTopic) > .setValueSerializationSchema(new SimpleStringSchema()) > .build() > ) > .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) > .build(); > > Op do 17 mrt. 2022 om 17:29 schreef Matthias Pohl : > Could you share more details on what's not working? Is the > ssl.trustore.location accessible from the Flink nodes? > > Matthias > > On Thu, Mar 17, 2022 at 4:00 PM HG wrote: > Hi all, > I am probably not the smartest but I cannot find how to set ssl-properties > for a Kafka Sink. > My assumption was that it would be just like the Kafka Consumer > > KafkaSource source = KafkaSource.builder() > .setProperties(kafkaProps) > .setProperty("ssl.truststore.type", trustStoreType) > .setProperty("ssl.truststore.password", trustStorePassword) > .setProperty("ssl.truststore.location", trustStoreLocation) > .setProperty("security.protocol", securityProtocol) > .setProperty("partition.discovery.interval.ms", > partitionDiscoveryIntervalMs) > .setProperty("commit.offsets.on.checkpoint", > commitOffsetsOnCheckpoint) > .setGroupId(inputGroupId) > .setClientIdPrefix(clientId) > .setTopics(kafkaInputTopic) > .setDeserializer(KafkaRecordDeserializationSchema.of(new > JSONKeyValueDeserializationSchema(fetchMetadata))) > > .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) > .build(); > > But that seems not to be the case. > > Any quick pointers? > > Regards Hans-Peter
Re: how to set kafka sink ssl properties
Hi Matthias, It should be probably be like this: Properties SinkkafkaProps = new Properties(); SinkkafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, outputBrokers); SinkkafkaProps.setProperty("ssl.truststore.type", trustStoreType); SinkkafkaProps.setProperty("ssl.truststore.location", trustStoreLocation); SinkkafkaProps.setProperty("partition.discovery.interval.ms", partitionDiscoveryIntervalMs); SinkkafkaProps.setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint); KafkaSink kSink = KafkaSink.builder() .setBootstrapServers(outputBrokers) .setKafkaProducerConfig(SinkkafkaProps) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic(kafkaOutputTopic) .setValueSerializationSchema(new SimpleStringSchema()) .build() ) .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build(); Op do 17 mrt. 2022 om 17:29 schreef Matthias Pohl : > Could you share more details on what's not working? Is the > ssl.trustore.location accessible from the Flink nodes? > > Matthias > > On Thu, Mar 17, 2022 at 4:00 PM HG wrote: > >> Hi all, >> I am probably not the smartest but I cannot find how to set >> ssl-properties for a Kafka Sink. >> My assumption was that it would be just like the Kafka Consumer >> >> KafkaSource source = KafkaSource.builder() >> .setProperties(kafkaProps) >> .setProperty("ssl.truststore.type", trustStoreType) >> .setProperty("ssl.truststore.password", trustStorePassword) >> .setProperty("ssl.truststore.location", trustStoreLocation) >> .setProperty("security.protocol", securityProtocol) >> .setProperty("partition.discovery.interval.ms", >> partitionDiscoveryIntervalMs) >> .setProperty("commit.offsets.on.checkpoint", >> commitOffsetsOnCheckpoint) >> .setGroupId(inputGroupId) >> .setClientIdPrefix(clientId) >> .setTopics(kafkaInputTopic) >> .setDeserializer(KafkaRecordDeserializationSchema.of(new >> JSONKeyValueDeserializationSchema(fetchMetadata))) >> >> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) >> .build(); >> >> >> But that seems not to be the case. >> >> Any quick pointers? >> >> Regards Hans-Peter >> >
Re: how to set kafka sink ssl properties
Could you share more details on what's not working? Is the ssl.trustore.location accessible from the Flink nodes? Matthias On Thu, Mar 17, 2022 at 4:00 PM HG wrote: > Hi all, > I am probably not the smartest but I cannot find how to set ssl-properties > for a Kafka Sink. > My assumption was that it would be just like the Kafka Consumer > > KafkaSource source = KafkaSource.builder() > .setProperties(kafkaProps) > .setProperty("ssl.truststore.type", trustStoreType) > .setProperty("ssl.truststore.password", trustStorePassword) > .setProperty("ssl.truststore.location", trustStoreLocation) > .setProperty("security.protocol", securityProtocol) > .setProperty("partition.discovery.interval.ms", > partitionDiscoveryIntervalMs) > .setProperty("commit.offsets.on.checkpoint", > commitOffsetsOnCheckpoint) > .setGroupId(inputGroupId) > .setClientIdPrefix(clientId) > .setTopics(kafkaInputTopic) > .setDeserializer(KafkaRecordDeserializationSchema.of(new > JSONKeyValueDeserializationSchema(fetchMetadata))) > > .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) > .build(); > > > But that seems not to be the case. > > Any quick pointers? > > Regards Hans-Peter >
how to set kafka sink ssl properties
Hi all, I am probably not the smartest but I cannot find how to set ssl-properties for a Kafka Sink. My assumption was that it would be just like the Kafka Consumer KafkaSource source = KafkaSource.builder() .setProperties(kafkaProps) .setProperty("ssl.truststore.type", trustStoreType) .setProperty("ssl.truststore.password", trustStorePassword) .setProperty("ssl.truststore.location", trustStoreLocation) .setProperty("security.protocol", securityProtocol) .setProperty("partition.discovery.interval.ms", partitionDiscoveryIntervalMs) .setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint) .setGroupId(inputGroupId) .setClientIdPrefix(clientId) .setTopics(kafkaInputTopic) .setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(fetchMetadata))) .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) .build(); But that seems not to be the case. Any quick pointers? Regards Hans-Peter