Arseniy Tashoyan created FLINK-24850:
----------------------------------------

             Summary: KafkaSinkBuilder: NullPointerException if 
bootstrap.servers are defined via properties
                 Key: FLINK-24850
                 URL: https://issues.apache.org/jira/browse/FLINK-24850
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
    Affects Versions: 1.14.0
            Reporter: Arseniy Tashoyan


Let's create KafkaSink:

{code:scala}
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put(""group.id"", "group1")
val kafkaSink = KafkaSink.builder[String]()
      .setKafkaProducerConfig(props)
      ...// other settings
      .build()
{code}

It fails:

{code:none}
java.lang.NullPointerException
        at 
org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59)
        at 
org.apache.flink.connector.kafka.sink.KafkaSinkBuilder.build(KafkaSinkBuilder.java:192)
{code}

_KafkaSinkBuilder.build()_ fails, because it expects that the bootstrap servers 
are set via _KafkaSinkBuilder.setBootstrapServers()_. It ignores bootstrap 
servers in the properties.

Meanwhile _KafkaSourseBuilder_ allows setting bootstrap servers via properties 
- not neccesarilly by calling _KafkaSourceBuilder.setBootstrapServers()_. It 
would be good to enable the same contract for _KafkaSinkBuilder_.

This is convenient to have all Kafka-specific settings in a config-file:

{code:none}
    settings {
      "bootstrap.servers" = "broker1:9092,broker2:9092"
      "group.id" = "retail-streaming"
      "auto.offset.reset" = "latest"
    }
{code}

Such settings can be passed as-is to 
_KafkaSinkBuilder.setKafkaProducerConfig()_ - no special treatment for 
_"bootstrap.servers"_.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to