Allan, thanks for the reply. In my case, I only used one channel and one sink 
at the same time.
 
About 10 minutes after the data were sent to the Flume agent, some messages 
were logged in flume.log (see below). It says class 
org/elasticsearch/common/transport/TransportAddress was not found. This seems 
indicating that the Cloudera version of Flume does not support 
ElasticSearchSink. Anyway to add the missing class or some jar file?
 
I also tried to download the flume from Flume site:
 
http://flume.apache.org/download.html
http://www.apache.org/dyn/closer.cgi/flume/1.3.1/apache-flume-1.3.1-bin.tar.gz
 
But the downloaded apache-flume-1.3.1-bin.tar.gz is complained as not a gzip 
file nor a tar file on my Linux box (Red Hat 5). Can anyone let me know the 
exact downloading process? If possible, please provide some step-by-step 
instruction for downloading and installation.
 
Thanks.
 
Shushuai
 
 
-------------------------------------------------------------------------------------------------------------
11 Jun 2013 19:40:37,082 INFO  [lifecycleSupervisor-1-0] 
(org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start:61)  - 
Configuration provider starting
11 Jun 2013 19:40:37,114 INFO  [conf-file-poller-0] 
(org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:133)
  - Reloading configuration file:conf/flume.conf
11 Jun 2013 19:40:37,121 INFO  [conf-file-poller-0] 
(org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016)  
- Processing:k1
11 Jun 2013 19:40:37,122 INFO  [conf-file-poller-0] 
(org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016)  
- Processing:k1
11 Jun 2013 19:40:37,122 INFO  [conf-file-poller-0] 
(org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016)  
- Processing:k1
11 Jun 2013 19:40:37,122 INFO  [conf-file-poller-0] 
(org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016)  
- Processing:k1
11 Jun 2013 19:40:37,122 INFO  [conf-file-poller-0] 
(org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016)  
- Processing:k1
11 Jun 2013 19:40:37,122 INFO  [conf-file-poller-0] 
(org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:930)  
- Added sinks: k1 Agent: agent1
11 Jun 2013 19:40:37,122 INFO  [conf-file-poller-0] 
(org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016)  
- Processing:k1
11 Jun 2013 19:40:37,123 INFO  [conf-file-poller-0] 
(org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016)  
- Processing:k1
11 Jun 2013 19:40:37,123 INFO  [conf-file-poller-0] 
(org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016)  
- Processing:k1
11 Jun 2013 19:40:37,457 INFO  [conf-file-poller-0] 
(org.apache.flume.conf.FlumeConfiguration.validateConfiguration:140)  - 
Post-validation flume configuration contains configuration for agents: [agent1]
11 Jun 2013 19:40:37,457 INFO  [conf-file-poller-0] 
(org.apache.flume.node.AbstractConfigurationProvider.loadChannels:150)  - 
Creating channels
11 Jun 2013 19:40:37,464 INFO  [conf-file-poller-0] 
(org.apache.flume.channel.DefaultChannelFactory.create:40)  - Creating instance 
of channel ch1 type memory
11 Jun 2013 19:40:37,468 INFO  [conf-file-poller-0] 
(org.apache.flume.node.AbstractConfigurationProvider.loadChannels:205)  - 
Created channel ch1
11 Jun 2013 19:40:37,469 INFO  [conf-file-poller-0] 
(org.apache.flume.source.DefaultSourceFactory.create:39)  - Creating instance 
of source avro-source1, type avro
11 Jun 2013 19:40:37,484 INFO  [conf-file-poller-0] 
(org.apache.flume.sink.DefaultSinkFactory.create:40)  - Creating instance of 
sink: k1, type: org.apache.flume.sink.elasticsearch.ElasticSearchSink
11 Jun 2013 19:40:37,489 ERROR [conf-file-poller-0] 
(org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:145)
  - Failed to start agent because dependencies were not found in classpath. 
Error follows.
java.lang.NoClassDefFoundError: 
org/elasticsearch/common/transport/TransportAddress
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:186)
        at 
org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:67)
        at 
org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:41)
        at 
org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:415)
        at 
org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103)
        at 
org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at 
java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:165)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
        at java.lang.Thread.run(Thread.java:679)
Caused by: java.lang.ClassNotFoundException: 
org.elasticsearch.common.transport.TransportAddress
        at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:321)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:266)
        ... 15 more
----------------------------------------------------------------------------------------------------------
 

________________________________
 From: Allan Feid <[email protected]>
To: [email protected]; shushuai zhu <[email protected]> 
Sent: Wednesday, June 12, 2013 10:09 AM
Subject: Re: ElasticSearchSink does not work

Hi Shushuai,

I've had a similar issue, and in my case it was because I was using the same 
channel for multiple sinks. I believe what happens is whatever sink can remove 
the event from the queue first will have it written out, but I don't know the 
specifics since I haven't had a chance to read through the codebase. If you add 
a second channel for your elasticsearch sink and make sure your avro-source 
writes to both channels, you should see data going to both locations. 

Thanks,
Allan


On Tue, Jun 11, 2013 at 10:37 PM, shushuai zhu <[email protected]> wrote:

Hi,
>
>I am new to Flume. I am trying to send data using Flume Client perl API to 
>Flume Avro source then ElasticSearchSink to an ElasticSearch engine. I could 
>make the file_roll sink to work by sending the data to some file. However, I 
>am encountering issue with ElasticSearchSink. The data did not go through to 
>ElasticSearch engine: 
>
>use Flume::Client;
>my $ng_client = Flume::Client::Transceiver::Socket->new(host => 'host name', 
>port => 41414);
>my $ng_requestor = Flume::Client::Requestor::FlumeNG->new(client => 
>$ng_client);
>my ($result, $response) = $ng_requestor->request('appendBatch', [{ headers => 
>{}, body => "hello, this is sent from perl (using FlumeNG)"}]);
>print "$response\n";    # response will be 'OK'
 on success
>
>since the returned $response is not defined (again this worked when file_roll 
>sink was used).
>
>The ElasticSearch engine is working since I could load data to it via 
>LogStash's EalsticSearch output type. 
>
>The Flume agent was installed by downloading tarball from Cloudera:
>
>http://archive.cloudera.com/cdh4/cdh/4/flume-ng-1.3.0-cdh4.3.0.tar.gz 
>
>The flume.conf is as followings. I played around the "hostNames" with full 
>name, IP address, etc. Do not see output message in flume.log. Could someone 
>let me know what could potentially cause the issue? 
>
>Thanks.
> 
>Shushuai
>
>
>
># Define a memory channel called ch1 on agent1
>agent1.channels = ch1
>agent1.channels.ch1.type = memory
>
># Define an Avro source called avro-source1 on agent1 and tell it to bind to 
>0.0.0.0:41414. Connect it to channel ch1.
>agent1.sources = avro-source1
>agent1.sources.avro-source1.channels = ch1
>agent1.sources.avro-source1.type = avro
>agent1.sources.avro-source1.bind = 0.0.0.0
>agent1.sources.avro-source1.port = 41414
>
># Define a local file sink that simply logs all events it receives (this works 
>well)
>#agent1.sinks = localout
>#agent1.sinks.localout.type = file_roll
>#agent1.sinks.localout.sink.directory = /scratch/flume-ng/log
>#agent1.sinks.localout.sink.rollInterval = 0
>#agent1.sinks.localout.channel = ch1
>
># Define ElasticSearchSink sink (this does not work)
>agent1.sinks = k1
>agent1.sinks.k1.type =
 org.apache.flume.sink.elasticsearch.ElasticSearchSink
>agent1.sinks.k1.hostNames = localhost:9300
>agent1.sinks.k1.indexName = flume
>agent1.sinks.k1.indexType = logs
>agent1.sinks.k1.clusterName = elasticsearch
>agent1.sinks.k1.batchSize = 2
>agent1.sinks.k1.serializer = 
>org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
>agent1.sinks.k1.channel = ch1
>

Reply via email to