Hello,
When running flume with kafka as a sink, an error is logged that
"brokerList must contain at least one Kafka broker" but the line
immediately previous shows the host:port entries as were entered in the
config file and stored in the context.
Everything works when I hardcode the host:port into the brokerList string
and skip the failing test but that is a suboptimal solution. The kafka
instances are from their quickstart guide and have no issues.
Why isn't the value being selected from the context?
flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkUtil.java:
private static void addDocumentedKafkaProps(Context context,
Properties kafkaProps)
throws ConfigurationException {
String brokerList = context.getString(KafkaSinkConstants
.BROKER_LIST_FLUME_KEY);
if (brokerList == null) {
throw new ConfigurationException("brokerList must contain at least " +
"one Kafka broker");
}
kafkaProps.put(KafkaSinkConstants.BROKER_LIST_KEY, brokerList);
String requiredKey = context.getString(
KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY);
if (requiredKey != null ) {
kafkaProps.put(KafkaSinkConstants.REQUIRED_ACKS_KEY, requiredKey);
}
}
Config:
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.metadata.broker.list = localhost:9091,localhost:9092
a1.sinks.k1.kafka.zookeeper.connect = localhost:2181
a1.sinks.k1.topic = test
logs/flume.log
25 Mar 2015 14:28:52,598 INFO [conf-file-poller-0]
(org.apache.flume.sink.kafka.KafkaSinkUtil.getKafkaProperties:34) -
context={ parameters:{topic=test,
kafka.metadata.broker.list=localhost:9091,localhost:9092,
kafka.zookeeper.connect=localhost:2181,
type=org.apache.flume.sink.kafka.KafkaSink, channel=c1} }
25 Mar 2015 14:28:52,611 ERROR [conf-file-poller-0]
(org.apache.flume.node.AbstractConfigurationProvider.loadSinks:427) - Sink
k1 has been removed due to an error during configuration
org.apache.flume.conf.ConfigurationException: brokerList must contain at
least one Kafka broker
at
org.apache.flume.sink.kafka.KafkaSinkUtil.addDocumentedKafkaProps(KafkaSinkUtil.java:55)
at
org.apache.flume.sink.kafka.KafkaSinkUtil.getKafkaProperties(KafkaSinkUtil.java:37)
at
org.apache.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:211)
at
org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at
org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:413)
at
org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:98)
at
org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
git clone https://github.com/apache/flume.git
mvn compile install -DskipTests
Version 1.6.0-SNAPSHOT from today
Thanks!