Hi guys,

I'm interested in Flafka and try to build a kafka to hdfs data flow on my own 
machine. I have some questions to ask and how someone can help me, thanks in 
advance.

1. I tried kafka channel and logger sink(no source, logger sink for test, 
eventually it will be hdfs sink). The config file is:

a1.sinks = sink1
a1.channels = channel1

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.brokerList = localhost:9093,localhost:9094
a1.channels.channel1.topic = test
a1.channels.channel1.zookeeperConnect = localhost:2181
a1.channels.channel1.parseAsFlumeEvent = false
a1.channels.cnannel1.kafka.consumer.timeout.ms 
<http://a1.channels.cnannel1.kafka.consumer.timeout.ms/> = 1000000

a1.sinks.sink1.channel = channel1
a1.sinks.sink1.type = logger

and the error messages I got is the following which says unable to deliver 
event:

MACC02PHH5LG3QC:apache-flume-1.6.0-bin jun.ma <http://jun.ma/>$ bin/flume-ng 
agent --conf conf --conf-file conf/example.conf --name a1 
-Dflume.root.logger=INFO,console
+ exec /Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/bin/java 
-Xmx20m -Dflume.root.logger=INFO,console -cp 
'/Users/jun.ma/apache-flume-1.6.0-bin/conf:/Users/jun.ma/apache-flume-1.6.0-bin/lib/*
 
<http://jun.ma/apache-flume-1.6.0-bin/conf:/Users/jun.ma/apache-flume-1.6.0-bin/lib/*>'
 -Djava.library.path= org.apache.flume.node.Application --conf-file 
conf/example.conf --name a1
2015-07-03 12:50:21,549 (lifecycleSupervisor-1-0) [INFO - 
org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:61)]
 Configuration provider starting
2015-07-03 12:50:21,554 (conf-file-poller-0) [INFO - 
org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)]
 Reloading configuration file:conf/example.conf
2015-07-03 12:50:21,562 (conf-file-poller-0) [INFO - 
org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)]
 Processing:sink1
2015-07-03 12:50:21,562 (conf-file-poller-0) [INFO - 
org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)]
 Processing:sink1
2015-07-03 12:50:21,562 (conf-file-poller-0) [INFO - 
org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:931)]
 Added sinks: sink1 Agent: a1
2015-07-03 12:50:21,567 (conf-file-poller-0) [WARN - 
org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSources(FlumeConfiguration.java:508)]
 Agent configuration for 'a1' has no sources.
2015-07-03 12:50:21,569 (conf-file-poller-0) [INFO - 
org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:141)]
 Post-validation flume configuration contains configuration for agents: [a1]
2015-07-03 12:50:21,570 (conf-file-poller-0) [INFO - 
org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:145)]
 Creating channels
2015-07-03 12:50:21,576 (conf-file-poller-0) [INFO - 
org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)]
 Creating instance of channel channel1 type 
org.apache.flume.channel.kafka.KafkaChannel
2015-07-03 12:50:21,588 (conf-file-poller-0) [INFO - 
org.apache.flume.channel.kafka.KafkaChannel.configure(KafkaChannel.java:168)] 
Group ID was not specified. Using flume as the group id.
2015-07-03 12:50:21,602 (conf-file-poller-0) [INFO - 
org.apache.flume.channel.kafka.KafkaChannel.configure(KafkaChannel.java:188)] 
{metadata.broker.list=localhost:9093,localhost:9094, request.required.acks=-1, 
group.id <http://group.id/>=flume, zookeeper.connect=localhost:2181, 
consumer.timeout.ms <http://consumer.timeout.ms/>=100, auto.commit.enable=false}
2015-07-03 12:50:21,610 (conf-file-poller-0) [INFO - 
org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:200)]
 Created channel channel1
2015-07-03 12:50:21,611 (conf-file-poller-0) [INFO - 
org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] 
Creating instance of sink: sink1, type: logger
2015-07-03 12:50:21,614 (conf-file-poller-0) [INFO - 
org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:114)]
 Channel channel1 connected to [sink1]
2015-07-03 12:50:21,620 (conf-file-poller-0) [INFO - 
org.apache.flume.node.Application.startAllComponents(Application.java:138)] 
Starting new configuration:{ sourceRunners:{} sinkRunners:{sink1=SinkRunner: { 
policy:org.apache.flume.sink.DefaultSinkProcessor@4ae671f6 counterGroup:{ 
name:null counters:{} } }} 
channels:{channel1=org.apache.flume.channel.kafka.KafkaChannel{name: channel1}} 
}
2015-07-03 12:50:21,621 (conf-file-poller-0) [INFO - 
org.apache.flume.node.Application.startAllComponents(Application.java:145)] 
Starting Channel channel1
2015-07-03 12:50:21,623 (lifecycleSupervisor-1-0) [INFO - 
org.apache.flume.channel.kafka.KafkaChannel.start(KafkaChannel.java:96)] 
Starting Kafka Channel: channel1
2015-07-03 12:50:21,938 (lifecycleSupervisor-1-0) [INFO - 
kafka.utils.Logging$class.info <http://class.info/>(Logging.scala:68)] 
Verifying properties
2015-07-03 12:50:21,971 (lifecycleSupervisor-1-0) [WARN - 
kafka.utils.Logging$class.warn(Logging.scala:83)] Property auto.commit.enable 
is not valid
2015-07-03 12:50:21,972 (lifecycleSupervisor-1-0) [WARN - 
kafka.utils.Logging$class.warn(Logging.scala:83)] Property consumer.timeout.ms 
<http://consumer.timeout.ms/> is not valid
2015-07-03 12:50:21,972 (lifecycleSupervisor-1-0) [WARN - 
kafka.utils.Logging$class.warn(Logging.scala:83)] Property group.id 
<http://group.id/> is not valid
2015-07-03 12:50:21,973 (lifecycleSupervisor-1-0) [INFO - 
kafka.utils.Logging$class.info <http://class.info/>(Logging.scala:68)] Property 
metadata.broker.list is overridden to localhost:9093,localhost:9094
2015-07-03 12:50:21,973 (lifecycleSupervisor-1-0) [INFO - 
kafka.utils.Logging$class.info <http://class.info/>(Logging.scala:68)] Property 
request.required.acks is overridden to -1
2015-07-03 12:50:21,973 (lifecycleSupervisor-1-0) [WARN - 
kafka.utils.Logging$class.warn(Logging.scala:83)] Property zookeeper.connect is 
not valid
2015-07-03 12:50:22,017 (lifecycleSupervisor-1-0) [INFO - 
org.apache.flume.channel.kafka.KafkaChannel.start(KafkaChannel.java:99)] Topic 
= test
2015-07-03 12:50:22,018 (lifecycleSupervisor-1-0) [INFO - 
org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)]
 Monitored counter group for type: CHANNEL, name: channel1: Successfully 
registered new MBean.
2015-07-03 12:50:22,018 (lifecycleSupervisor-1-0) [INFO - 
org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)]
 Component type: CHANNEL, name: channel1 started
2015-07-03 12:50:22,018 (conf-file-poller-0) [INFO - 
org.apache.flume.node.Application.startAllComponents(Application.java:173)] 
Starting Sink sink1
2015-07-03 12:50:22,029 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - 
kafka.utils.Logging$class.info <http://class.info/>(Logging.scala:68)] 
Verifying properties
2015-07-03 12:50:22,029 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - 
kafka.utils.Logging$class.info <http://class.info/>(Logging.scala:68)] Property 
auto.commit.enable is overridden to false
2015-07-03 12:50:22,029 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - 
kafka.utils.Logging$class.info <http://class.info/>(Logging.scala:68)] Property 
consumer.timeout.ms <http://consumer.timeout.ms/> is overridden to 100
2015-07-03 12:50:22,030 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - 
kafka.utils.Logging$class.info <http://class.info/>(Logging.scala:68)] Property 
group.id <http://group.id/> is overridden to flume
2015-07-03 12:50:22,030 (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN - 
kafka.utils.Logging$class.warn(Logging.scala:83)] Property metadata.broker.list 
is not valid
2015-07-03 12:50:22,030 (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN - 
kafka.utils.Logging$class.warn(Logging.scala:83)] Property 
request.required.acks is not valid
2015-07-03 12:50:22,030 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - 
kafka.utils.Logging$class.info <http://class.info/>(Logging.scala:68)] Property 
zookeeper.connect is overridden to localhost:2181
2015-07-03 12:50:22,063 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - 
kafka.utils.Logging$class.info <http://class.info/>(Logging.scala:68)] 
[flume_MACC02PHH5LG3QC-1435953022061-eaf69e13], Connecting to zookeeper 
instance at localhost:2181
2015-07-03 12:50:22,065 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR 
- org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to 
deliver event. Exception follows.
java.lang.IllegalStateException: close() called when transaction is OPEN - you 
must either commit or rollback first
        at 
com.google.common.base.Preconditions.checkState(Preconditions.java:172)
        at 
org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
        at org.apache.flume.sink.LoggerSink.process(LoggerSink.java:105)
        at 
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)

I set up zookeeper and two brokers locally and have a customer producer 
generating data to the "test" topic. If I use my consumer I can get data 
delivered. Another thing is a1.channels.cnannel1.kafka.consumer.timeout.ms 
<http://a1.channels.cnannel1.kafka.consumer.timeout.ms/> = 1000000 is not 
successfully set up and I don't know why.

2. How do I set up hdfs path if I run agent on my local machine? Currently I 
put hdfs-site.xml which illustrate into flume/conf directory, I'm not sure if 
that is correct. Any suggestions are welcome, thanks!

Bests,
Jun

Reply via email to