Allan, thanks for the reply. In my case, I only used one channel and one sink at the same time. About 10 minutes after the data were sent to the Flume agent, some messages were logged in flume.log (see below). It says class org/elasticsearch/common/transport/TransportAddress was not found. This seems indicating that the Cloudera version of Flume does not support ElasticSearchSink. Anyway to add the missing class or some jar file? I also tried to download the flume from Flume site: http://flume.apache.org/download.html http://www.apache.org/dyn/closer.cgi/flume/1.3.1/apache-flume-1.3.1-bin.tar.gz But the downloaded apache-flume-1.3.1-bin.tar.gz is complained as not a gzip file nor a tar file on my Linux box (Red Hat 5). Can anyone let me know the exact downloading process? If possible, please provide some step-by-step instruction for downloading and installation. Thanks. Shushuai ------------------------------------------------------------------------------------------------------------- 11 Jun 2013 19:40:37,082 INFO [lifecycleSupervisor-1-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start:61) - Configuration provider starting 11 Jun 2013 19:40:37,114 INFO [conf-file-poller-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:133) - Reloading configuration file:conf/flume.conf 11 Jun 2013 19:40:37,121 INFO [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) - Processing:k1 11 Jun 2013 19:40:37,122 INFO [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) - Processing:k1 11 Jun 2013 19:40:37,122 INFO [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) - Processing:k1 11 Jun 2013 19:40:37,122 INFO [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) - Processing:k1 11 Jun 2013 19:40:37,122 INFO [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) - Processing:k1 11 Jun 2013 19:40:37,122 INFO [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:930) - Added sinks: k1 Agent: agent1 11 Jun 2013 19:40:37,122 INFO [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) - Processing:k1 11 Jun 2013 19:40:37,123 INFO [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) - Processing:k1 11 Jun 2013 19:40:37,123 INFO [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) - Processing:k1 11 Jun 2013 19:40:37,457 INFO [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration.validateConfiguration:140) - Post-validation flume configuration contains configuration for agents: [agent1] 11 Jun 2013 19:40:37,457 INFO [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadChannels:150) - Creating channels 11 Jun 2013 19:40:37,464 INFO [conf-file-poller-0] (org.apache.flume.channel.DefaultChannelFactory.create:40) - Creating instance of channel ch1 type memory 11 Jun 2013 19:40:37,468 INFO [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadChannels:205) - Created channel ch1 11 Jun 2013 19:40:37,469 INFO [conf-file-poller-0] (org.apache.flume.source.DefaultSourceFactory.create:39) - Creating instance of source avro-source1, type avro 11 Jun 2013 19:40:37,484 INFO [conf-file-poller-0] (org.apache.flume.sink.DefaultSinkFactory.create:40) - Creating instance of sink: k1, type: org.apache.flume.sink.elasticsearch.ElasticSearchSink 11 Jun 2013 19:40:37,489 ERROR [conf-file-poller-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:145) - Failed to start agent because dependencies were not found in classpath. Error follows. java.lang.NoClassDefFoundError: org/elasticsearch/common/transport/TransportAddress at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:186) at org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:67) at org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:41) at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:415) at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103) at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:165) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:679) Caused by: java.lang.ClassNotFoundException: org.elasticsearch.common.transport.TransportAddress at java.net.URLClassLoader$1.run(URLClassLoader.java:217) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:205) at java.lang.ClassLoader.loadClass(ClassLoader.java:321) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294) at java.lang.ClassLoader.loadClass(ClassLoader.java:266) ... 15 more ----------------------------------------------------------------------------------------------------------
________________________________ From: Allan Feid <[email protected]> To: [email protected]; shushuai zhu <[email protected]> Sent: Wednesday, June 12, 2013 10:09 AM Subject: Re: ElasticSearchSink does not work Hi Shushuai, I've had a similar issue, and in my case it was because I was using the same channel for multiple sinks. I believe what happens is whatever sink can remove the event from the queue first will have it written out, but I don't know the specifics since I haven't had a chance to read through the codebase. If you add a second channel for your elasticsearch sink and make sure your avro-source writes to both channels, you should see data going to both locations. Thanks, Allan On Tue, Jun 11, 2013 at 10:37 PM, shushuai zhu <[email protected]> wrote: Hi, > >I am new to Flume. I am trying to send data using Flume Client perl API to >Flume Avro source then ElasticSearchSink to an ElasticSearch engine. I could >make the file_roll sink to work by sending the data to some file. However, I >am encountering issue with ElasticSearchSink. The data did not go through to >ElasticSearch engine: > >use Flume::Client; >my $ng_client = Flume::Client::Transceiver::Socket->new(host => 'host name', >port => 41414); >my $ng_requestor = Flume::Client::Requestor::FlumeNG->new(client => >$ng_client); >my ($result, $response) = $ng_requestor->request('appendBatch', [{ headers => >{}, body => "hello, this is sent from perl (using FlumeNG)"}]); >print "$response\n"; # response will be 'OK' on success > >since the returned $response is not defined (again this worked when file_roll >sink was used). > >The ElasticSearch engine is working since I could load data to it via >LogStash's EalsticSearch output type. > >The Flume agent was installed by downloading tarball from Cloudera: > >http://archive.cloudera.com/cdh4/cdh/4/flume-ng-1.3.0-cdh4.3.0.tar.gz > >The flume.conf is as followings. I played around the "hostNames" with full >name, IP address, etc. Do not see output message in flume.log. Could someone >let me know what could potentially cause the issue? > >Thanks. > >Shushuai > > > ># Define a memory channel called ch1 on agent1 >agent1.channels = ch1 >agent1.channels.ch1.type = memory > ># Define an Avro source called avro-source1 on agent1 and tell it to bind to >0.0.0.0:41414. Connect it to channel ch1. >agent1.sources = avro-source1 >agent1.sources.avro-source1.channels = ch1 >agent1.sources.avro-source1.type = avro >agent1.sources.avro-source1.bind = 0.0.0.0 >agent1.sources.avro-source1.port = 41414 > ># Define a local file sink that simply logs all events it receives (this works >well) >#agent1.sinks = localout >#agent1.sinks.localout.type = file_roll >#agent1.sinks.localout.sink.directory = /scratch/flume-ng/log >#agent1.sinks.localout.sink.rollInterval = 0 >#agent1.sinks.localout.channel = ch1 > ># Define ElasticSearchSink sink (this does not work) >agent1.sinks = k1 >agent1.sinks.k1.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink >agent1.sinks.k1.hostNames = localhost:9300 >agent1.sinks.k1.indexName = flume >agent1.sinks.k1.indexType = logs >agent1.sinks.k1.clusterName = elasticsearch >agent1.sinks.k1.batchSize = 2 >agent1.sinks.k1.serializer = >org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer >agent1.sinks.k1.channel = ch1 >
