Set this param: a1.sinks.k1.brokerList = <list of brokers> instead of a1.sinks.k1.kafka.metadata.broker.list = localhost:9091,localhost:9092
Thanks, Hari On Wed, Mar 25, 2015 at 12:02 PM, Adam Tannir <[email protected]> wrote: > Hello, > > When running flume with kafka as a sink, an error is logged that > "brokerList must contain at least one Kafka broker" but the line > immediately previous shows the host:port entries as were entered in the > config file and stored in the context. > > Everything works when I hardcode the host:port into the brokerList string > and skip the failing test but that is a suboptimal solution. The kafka > instances are from their quickstart guide and have no issues. > > Why isn't the value being selected from the context? > > > flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkUtil.java: > > private static void addDocumentedKafkaProps(Context context, > Properties kafkaProps) > throws ConfigurationException { > String brokerList = context.getString(KafkaSinkConstants > .BROKER_LIST_FLUME_KEY); > if (brokerList == null) { > throw new ConfigurationException("brokerList must contain at least " > + > "one Kafka broker"); > } > kafkaProps.put(KafkaSinkConstants.BROKER_LIST_KEY, brokerList); > > String requiredKey = context.getString( > KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY); > > if (requiredKey != null ) { > kafkaProps.put(KafkaSinkConstants.REQUIRED_ACKS_KEY, requiredKey); > } > } > > > > Config: > > # Describe the sink > a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink > a1.sinks.k1.kafka.metadata.broker.list = localhost:9091,localhost:9092 > a1.sinks.k1.kafka.zookeeper.connect = localhost:2181 > a1.sinks.k1.topic = test > > logs/flume.log > > 25 Mar 2015 14:28:52,598 INFO [conf-file-poller-0] > (org.apache.flume.sink.kafka.KafkaSinkUtil.getKafkaProperties:34) - > context={ parameters:{topic=test, > kafka.metadata.broker.list=localhost:9091,localhost:9092, > kafka.zookeeper.connect=localhost:2181, > type=org.apache.flume.sink.kafka.KafkaSink, channel=c1} } > 25 Mar 2015 14:28:52,611 ERROR [conf-file-poller-0] > (org.apache.flume.node.AbstractConfigurationProvider.loadSinks:427) - Sink > k1 has been removed due to an error during configuration > org.apache.flume.conf.ConfigurationException: brokerList must contain at > least one Kafka broker > at > org.apache.flume.sink.kafka.KafkaSinkUtil.addDocumentedKafkaProps(KafkaSinkUtil.java:55) > at > org.apache.flume.sink.kafka.KafkaSinkUtil.getKafkaProperties(KafkaSinkUtil.java:37) > at > org.apache.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:211) > at > org.apache.flume.conf.Configurables.configure(Configurables.java:41) > at > org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:413) > at > org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:98) > at > org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > > git clone https://github.com/apache/flume.git > mvn compile install -DskipTests > Version 1.6.0-SNAPSHOT from today > > Thanks! >
