Matt, thanks. I had mentioned in an early mail that the problem was resolved, in the same way as you mentioned. Shushuai
________________________________ From: Matt Wise <[email protected]> To: [email protected]; shushuai zhu <[email protected]> Sent: Tuesday, June 18, 2013 2:36 PM Subject: Re: ElasticSearchSink does not work In order for me to use the elasticsearch sink, I had to install the ElasticSearch JAR packages onto my Flume nodes and add them to the environment through the flume-env.sh script. Here's our puppet flume-env.sh template: # Give Flume more memory and pre-allocate, enable remote monitoring via JMX JAVA_OPTS="-Xms100m -Xmx<%= flume_max_mem.to_i %>m -Dcom.sun.management.jmxremote -Dflume.monitoring.type=http -Dflume.monitoring.port=<%= flume_monitoring_port %>" > # Note that the Flume conf directory is always included in the classpath. FLUME_CLASSPATH="<%= elasticsearch_dest %>/lib/*" --Matt On Jun 12, 2013, at 8:04 AM, shushuai zhu <[email protected]> wrote: Hi, just a quick update. Found some other site to download apache-flume-1.3.1-bin.tar.gz like: > >http://apache.mesi.com.ar/flume/1.3.1/apache-flume-1.3.1-bin.tar.gz > >After the installation and running, still got the same class not found >exception (see below log messages). I suspect some additional jar file is >needed for ElasticSearchSink. Anyone have any idea? > >Thanks. > >Shushuai > > > >________________________________ > From: shushuai zhu <[email protected]> >To: "[email protected]" <[email protected]> >Sent: Wednesday, June 12, 2013 10:35 AM >Subject: Re: ElasticSearchSink does not work > > > >Allan, thanks for the reply. In my case, I only used one channel and one sink >at the same time. > >About 10 minutes after the data were sent to the Flume agent, some messages >were logged in flume.log (see below). It says class >org/elasticsearch/common/transport/TransportAddress was not found. This seems >indicating that the Cloudera version of Flume does not support >ElasticSearchSink. Anyway to add the missing class or some jar file? > >I also tried to download the flume from Flume site: > >http://flume.apache.org/download.html >http://www.apache.org/dyn/closer.cgi/flume/1.3.1/apache-flume-1.3.1-bin.tar.gz > >But the downloaded apache-flume-1.3.1-bin.tar.gz is complained as not a gzip >file nor a tar file on my Linux box (Red Hat 5). Can anyone let me know the >exact downloading process? If possible, please provide some step-by-step >instruction for downloading and installation. > >Thanks. > >Shushuai > > >------------------------------------------------------------------------------------------------------------- >11 Jun 2013 19:40:37,082 INFO [lifecycleSupervisor-1-0] >(org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start:61) - >Configuration provider starting >11 Jun 2013 19:40:37,114 INFO [conf-file-poller-0] >(org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:133) > - Reloading configuration file:conf/flume.conf >11 Jun 2013 19:40:37,121 INFO [conf-file-poller-0] >(org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) > - Processing:k1 >11 Jun 2013 19:40:37,122 INFO [conf-file-poller-0] >(org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) > - Processing:k1 >11 Jun 2013 19:40:37,122 INFO [conf-file-poller-0] >(org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) > - Processing:k1 >11 Jun 2013 19:40:37,122 INFO [conf-file-poller-0] >(org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) > - Processing:k1 >11 Jun 2013 19:40:37,122 INFO [conf-file-poller-0] >(org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) > - Processing:k1 >11 Jun 2013 19:40:37,122 INFO [conf-file-poller-0] >(org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:930) >- Added sinks: k1 Agent: agent1 >11 Jun 2013 19:40:37,122 INFO [conf-file-poller-0] >(org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) > - Processing:k1 >11 Jun 2013 19:40:37,123 INFO [conf-file-poller-0] >(org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) > - Processing:k1 >11 Jun 2013 19:40:37,123 INFO [conf-file-poller-0] >(org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) > - Processing:k1 >11 Jun 2013 19:40:37,457 INFO [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration.validateConfiguration:140) - Post-validation flume configuration contains configuration for agents: [agent1] >11 Jun 2013 19:40:37,457 INFO [conf-file-poller-0] >(org.apache.flume.node.AbstractConfigurationProvider.loadChannels:150) - >Creating channels >11 Jun 2013 19:40:37,464 INFO [conf-file-poller-0] >(org.apache.flume.channel.DefaultChannelFactory.create:40) - Creating >instance of channel ch1 type memory >11 Jun 2013 19:40:37,468 INFO [conf-file-poller-0] >(org.apache.flume.node.AbstractConfigurationProvider.loadChannels:205) - >Created channel ch1 >11 Jun 2013 19:40:37,469 INFO [conf-file-poller-0] >(org.apache.flume.source.DefaultSourceFactory.create:39) - Creating instance >of source avro-source1, type avro >11 Jun 2013 19:40:37,484 INFO [conf-file-poller-0] >(org.apache.flume.sink.DefaultSinkFactory.create:40) - Creating instance of sink: k1, type: org.apache.flume.sink.elasticsearch.ElasticSearchSink >11 Jun 2013 19:40:37,489 ERROR [conf-file-poller-0] >(org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:145) > - Failed to start agent because dependencies were not found in classpath. >Error follows. >java.lang.NoClassDefFoundError: >org/elasticsearch/common/transport/TransportAddress > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:186) > at >org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:67) > at >org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:41) > at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:415) > at >org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103) > at >org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140) > at >java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at >java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178) > at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:165) > at >java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267) > at >java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > at >java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > at java.lang.Thread.run(Thread.java:679) >Caused by: java.lang.ClassNotFoundException: >org.elasticsearch.common.transport.TransportAddress > at java.net.URLClassLoader$1.run(URLClassLoader.java:217) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:205) > at java.lang.ClassLoader.loadClass(ClassLoader.java:321) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294) > at java.lang.ClassLoader.loadClass(ClassLoader.java:266) > ... 15 more >---------------------------------------------------------------------------------------------------------- > > > >________________________________ > From: Allan Feid <[email protected]> >To: [email protected]; shushuai zhu <[email protected]> >Sent: Wednesday, June 12, 2013 10:09 AM >Subject: Re: ElasticSearchSink does not work > >Hi Shushuai, > >I've had a similar issue, and in my case it was because I was using the same >channel for multiple sinks. I believe what happens is whatever sink can remove >the event from the queue first will have it written out, but I don't know the >specifics since I haven't had a chance to read through the codebase. If you >add a second channel for your elasticsearch sink and make sure your avro-source writes to both channels, you should see data going to both locations. > >Thanks, >Allan > > >On Tue, Jun 11, 2013 at 10:37 PM, shushuai zhu <[email protected]> wrote: > >Hi, >> >>I am new to Flume. I am trying to send data using Flume Client perl API to >>Flume Avro source then ElasticSearchSink to an ElasticSearch engine. I could >>make the file_roll sink to work by sending the data to some file. However, I >>am encountering issue with ElasticSearchSink. The data did not go through to >>ElasticSearch engine: >> >>use Flume::Client; >>my $ng_client = Flume::Client::Transceiver::Socket->new(host => 'host name', >>port => 41414); >>my $ng_requestor = Flume::Client::Requestor::FlumeNG->new(client => >>$ng_client); >>my ($result, $response) = $ng_requestor->request('appendBatch', [{ headers => >>{}, body => "hello, this is sent from perl (using FlumeNG)"}]); >>print "$response\n"; # response will be 'OK' on success >> >>since the returned $response is not defined (again this worked when file_roll >>sink was used). >> >>The ElasticSearch engine is working since I could load data to it via >>LogStash's EalsticSearch output type. >> >>The Flume agent was installed by downloading tarball from Cloudera: >> >>http://archive.cloudera.com/cdh4/cdh/4/flume-ng-1.3.0-cdh4.3.0.tar.gz >> >>The flume.conf is as followings. I played around the "hostNames" with full >>name, IP address, etc. Do not see output message in flume.log. Could someone >>let me know what could potentially cause the issue? >> >>Thanks. >> >>Shushuai >> >> >> >># Define a memory channel called ch1 on agent1 >>agent1.channels = ch1 >>agent1.channels.ch1.type = memory >> >># Define an Avro source called avro-source1 on agent1 and tell it to bind to >>0.0.0.0:41414. Connect it to channel ch1. >>agent1.sources = avro-source1 >>agent1.sources.avro-source1.channels = ch1 >>agent1.sources.avro-source1.type = avro >>agent1.sources.avro-source1.bind = 0.0.0.0 >>agent1.sources.avro-source1.port = 41414 >> >># Define a local file sink that simply logs all events it receives (this >>works well) >>#agent1.sinks = localout >>#agent1.sinks.localout.type = file_roll >>#agent1.sinks.localout.sink.directory = /scratch/flume-ng/log >>#agent1.sinks.localout.sink.rollInterval = 0 >>#agent1.sinks.localout.channel = ch1 >> >># Define ElasticSearchSink sink (this does not work) >>agent1.sinks = k1 >>agent1.sinks.k1.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink >>agent1.sinks.k1.hostNames = localhost:9300 >>agent1.sinks.k1.indexName = flume >>agent1.sinks.k1.indexType = logs >>agent1.sinks.k1.clusterName = elasticsearch >>agent1.sinks.k1.batchSize = 2 >>agent1.sinks.k1.serializer = >>org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer >>agent1.sinks.k1.channel = ch1 >> > > > > >
