Hi guys, I'm interested in Flafka and try to build a kafka to hdfs data flow on my own machine. I have some questions to ask and how someone can help me, thanks in advance.
1. I tried kafka channel and logger sink(no source, logger sink for test, eventually it will be hdfs sink). The config file is: a1.sinks = sink1 a1.channels = channel1 a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.channel1.brokerList = localhost:9093,localhost:9094 a1.channels.channel1.topic = test a1.channels.channel1.zookeeperConnect = localhost:2181 a1.channels.channel1.parseAsFlumeEvent = false a1.channels.cnannel1.kafka.consumer.timeout.ms <http://a1.channels.cnannel1.kafka.consumer.timeout.ms/> = 1000000 a1.sinks.sink1.channel = channel1 a1.sinks.sink1.type = logger and the error messages I got is the following which says unable to deliver event: MACC02PHH5LG3QC:apache-flume-1.6.0-bin jun.ma <http://jun.ma/>$ bin/flume-ng agent --conf conf --conf-file conf/example.conf --name a1 -Dflume.root.logger=INFO,console + exec /Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/Users/jun.ma/apache-flume-1.6.0-bin/conf:/Users/jun.ma/apache-flume-1.6.0-bin/lib/* <http://jun.ma/apache-flume-1.6.0-bin/conf:/Users/jun.ma/apache-flume-1.6.0-bin/lib/*>' -Djava.library.path= org.apache.flume.node.Application --conf-file conf/example.conf --name a1 2015-07-03 12:50:21,549 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:61)] Configuration provider starting 2015-07-03 12:50:21,554 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)] Reloading configuration file:conf/example.conf 2015-07-03 12:50:21,562 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:sink1 2015-07-03 12:50:21,562 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:sink1 2015-07-03 12:50:21,562 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:931)] Added sinks: sink1 Agent: a1 2015-07-03 12:50:21,567 (conf-file-poller-0) [WARN - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSources(FlumeConfiguration.java:508)] Agent configuration for 'a1' has no sources. 2015-07-03 12:50:21,569 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:141)] Post-validation flume configuration contains configuration for agents: [a1] 2015-07-03 12:50:21,570 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:145)] Creating channels 2015-07-03 12:50:21,576 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel channel1 type org.apache.flume.channel.kafka.KafkaChannel 2015-07-03 12:50:21,588 (conf-file-poller-0) [INFO - org.apache.flume.channel.kafka.KafkaChannel.configure(KafkaChannel.java:168)] Group ID was not specified. Using flume as the group id. 2015-07-03 12:50:21,602 (conf-file-poller-0) [INFO - org.apache.flume.channel.kafka.KafkaChannel.configure(KafkaChannel.java:188)] {metadata.broker.list=localhost:9093,localhost:9094, request.required.acks=-1, group.id <http://group.id/>=flume, zookeeper.connect=localhost:2181, consumer.timeout.ms <http://consumer.timeout.ms/>=100, auto.commit.enable=false} 2015-07-03 12:50:21,610 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:200)] Created channel channel1 2015-07-03 12:50:21,611 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: sink1, type: logger 2015-07-03 12:50:21,614 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:114)] Channel channel1 connected to [sink1] 2015-07-03 12:50:21,620 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:138)] Starting new configuration:{ sourceRunners:{} sinkRunners:{sink1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@4ae671f6 counterGroup:{ name:null counters:{} } }} channels:{channel1=org.apache.flume.channel.kafka.KafkaChannel{name: channel1}} } 2015-07-03 12:50:21,621 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:145)] Starting Channel channel1 2015-07-03 12:50:21,623 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.kafka.KafkaChannel.start(KafkaChannel.java:96)] Starting Kafka Channel: channel1 2015-07-03 12:50:21,938 (lifecycleSupervisor-1-0) [INFO - kafka.utils.Logging$class.info <http://class.info/>(Logging.scala:68)] Verifying properties 2015-07-03 12:50:21,971 (lifecycleSupervisor-1-0) [WARN - kafka.utils.Logging$class.warn(Logging.scala:83)] Property auto.commit.enable is not valid 2015-07-03 12:50:21,972 (lifecycleSupervisor-1-0) [WARN - kafka.utils.Logging$class.warn(Logging.scala:83)] Property consumer.timeout.ms <http://consumer.timeout.ms/> is not valid 2015-07-03 12:50:21,972 (lifecycleSupervisor-1-0) [WARN - kafka.utils.Logging$class.warn(Logging.scala:83)] Property group.id <http://group.id/> is not valid 2015-07-03 12:50:21,973 (lifecycleSupervisor-1-0) [INFO - kafka.utils.Logging$class.info <http://class.info/>(Logging.scala:68)] Property metadata.broker.list is overridden to localhost:9093,localhost:9094 2015-07-03 12:50:21,973 (lifecycleSupervisor-1-0) [INFO - kafka.utils.Logging$class.info <http://class.info/>(Logging.scala:68)] Property request.required.acks is overridden to -1 2015-07-03 12:50:21,973 (lifecycleSupervisor-1-0) [WARN - kafka.utils.Logging$class.warn(Logging.scala:83)] Property zookeeper.connect is not valid 2015-07-03 12:50:22,017 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.kafka.KafkaChannel.start(KafkaChannel.java:99)] Topic = test 2015-07-03 12:50:22,018 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: CHANNEL, name: channel1: Successfully registered new MBean. 2015-07-03 12:50:22,018 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: CHANNEL, name: channel1 started 2015-07-03 12:50:22,018 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:173)] Starting Sink sink1 2015-07-03 12:50:22,029 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info <http://class.info/>(Logging.scala:68)] Verifying properties 2015-07-03 12:50:22,029 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info <http://class.info/>(Logging.scala:68)] Property auto.commit.enable is overridden to false 2015-07-03 12:50:22,029 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info <http://class.info/>(Logging.scala:68)] Property consumer.timeout.ms <http://consumer.timeout.ms/> is overridden to 100 2015-07-03 12:50:22,030 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info <http://class.info/>(Logging.scala:68)] Property group.id <http://group.id/> is overridden to flume 2015-07-03 12:50:22,030 (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN - kafka.utils.Logging$class.warn(Logging.scala:83)] Property metadata.broker.list is not valid 2015-07-03 12:50:22,030 (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN - kafka.utils.Logging$class.warn(Logging.scala:83)] Property request.required.acks is not valid 2015-07-03 12:50:22,030 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info <http://class.info/>(Logging.scala:68)] Property zookeeper.connect is overridden to localhost:2181 2015-07-03 12:50:22,063 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info <http://class.info/>(Logging.scala:68)] [flume_MACC02PHH5LG3QC-1435953022061-eaf69e13], Connecting to zookeeper instance at localhost:2181 2015-07-03 12:50:22,065 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows. java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first at com.google.common.base.Preconditions.checkState(Preconditions.java:172) at org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179) at org.apache.flume.sink.LoggerSink.process(LoggerSink.java:105) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745) I set up zookeeper and two brokers locally and have a customer producer generating data to the "test" topic. If I use my consumer I can get data delivered. Another thing is a1.channels.cnannel1.kafka.consumer.timeout.ms <http://a1.channels.cnannel1.kafka.consumer.timeout.ms/> = 1000000 is not successfully set up and I don't know why. 2. How do I set up hdfs path if I run agent on my local machine? Currently I put hdfs-site.xml which illustrate into flume/conf directory, I'm not sure if that is correct. Any suggestions are welcome, thanks! Bests, Jun
