Thanks, Hari. Everything works as it should now. I had tried a few other configuration entries previously but they all had kafka after the sink name.
On Wed, Mar 25, 2015 at 3:15 PM, Hari Shreedharan <[email protected] > wrote: > Set this param: a1.sinks.k1.brokerList = <list of brokers> > instead of a1.sinks.k1.kafka.metadata.broker.list = > localhost:9091,localhost:9092 > > > Thanks, > Hari > > On Wed, Mar 25, 2015 at 12:02 PM, Adam Tannir <[email protected]> wrote: > >> Hello, >> >> When running flume with kafka as a sink, an error is logged that >> "brokerList must contain at least one Kafka broker" but the line >> immediately previous shows the host:port entries as were entered in the >> config file and stored in the context. >> >> Everything works when I hardcode the host:port into the brokerList string >> and skip the failing test but that is a suboptimal solution. The kafka >> instances are from their quickstart guide and have no issues. >> >> Why isn't the value being selected from the context? >> >> >> flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkUtil.java: >> >> private static void addDocumentedKafkaProps(Context context, >> Properties kafkaProps) >> throws ConfigurationException { >> String brokerList = context.getString(KafkaSinkConstants >> .BROKER_LIST_FLUME_KEY); >> if (brokerList == null) { >> throw new ConfigurationException("brokerList must contain at least >> " + >> "one Kafka broker"); >> } >> kafkaProps.put(KafkaSinkConstants.BROKER_LIST_KEY, brokerList); >> >> String requiredKey = context.getString( >> KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY); >> >> if (requiredKey != null ) { >> kafkaProps.put(KafkaSinkConstants.REQUIRED_ACKS_KEY, requiredKey); >> } >> } >> >> >> >> Config: >> >> # Describe the sink >> a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink >> a1.sinks.k1.kafka.metadata.broker.list = localhost:9091,localhost:9092 >> a1.sinks.k1.kafka.zookeeper.connect = localhost:2181 >> a1.sinks.k1.topic = test >> >> logs/flume.log >> >> 25 Mar 2015 14:28:52,598 INFO [conf-file-poller-0] >> (org.apache.flume.sink.kafka.KafkaSinkUtil.getKafkaProperties:34) - >> context={ parameters:{topic=test, >> kafka.metadata.broker.list=localhost:9091,localhost:9092, >> kafka.zookeeper.connect=localhost:2181, >> type=org.apache.flume.sink.kafka.KafkaSink, channel=c1} } >> 25 Mar 2015 14:28:52,611 ERROR [conf-file-poller-0] >> (org.apache.flume.node.AbstractConfigurationProvider.loadSinks:427) - Sink >> k1 has been removed due to an error during configuration >> org.apache.flume.conf.ConfigurationException: brokerList must contain at >> least one Kafka broker >> at >> org.apache.flume.sink.kafka.KafkaSinkUtil.addDocumentedKafkaProps(KafkaSinkUtil.java:55) >> at >> org.apache.flume.sink.kafka.KafkaSinkUtil.getKafkaProperties(KafkaSinkUtil.java:37) >> at >> org.apache.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:211) >> at >> org.apache.flume.conf.Configurables.configure(Configurables.java:41) >> at >> org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:413) >> at >> org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:98) >> at >> org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140) >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) >> at >> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:745) >> >> >> git clone https://github.com/apache/flume.git >> mvn compile install -DskipTests >> Version 1.6.0-SNAPSHOT from today >> >> Thanks! >> > >
