Matt, thanks. I had mentioned in an early mail that the problem was resolved, 
in the same way as you mentioned.
 
Shushuai 
 

________________________________
 From: Matt Wise <[email protected]>
To: [email protected]; shushuai zhu <[email protected]> 
Sent: Tuesday, June 18, 2013 2:36 PM
Subject: Re: ElasticSearchSink does not work
  


In order for me to use the elasticsearch sink, I had to install the 
ElasticSearch JAR packages onto my Flume nodes and add them to the environment 
through the flume-env.sh script. Here's our puppet flume-env.sh template:

# Give Flume more memory and pre-allocate, enable remote monitoring via JMX
JAVA_OPTS="-Xms100m -Xmx<%= flume_max_mem.to_i %>m 
-Dcom.sun.management.jmxremote -Dflume.monitoring.type=http 
-Dflume.monitoring.port=<%= flume_monitoring_port %>"

>
# Note that the Flume conf directory is always included in the classpath.
FLUME_CLASSPATH="<%= elasticsearch_dest %>/lib/*"

--Matt

On Jun 12, 2013, at 8:04 AM, shushuai zhu <[email protected]> wrote:

Hi, just a quick update. Found some other site to download 
apache-flume-1.3.1-bin.tar.gz like:
> 
>http://apache.mesi.com.ar/flume/1.3.1/apache-flume-1.3.1-bin.tar.gz
>
>After the installation and running, still got the same class not found 
>exception (see below log messages). I suspect some additional jar file is 
>needed for ElasticSearchSink. Anyone have any idea?
>
>Thanks.
>
>Shushuai
>
>
>
>________________________________
> From: shushuai zhu <[email protected]>
>To: "[email protected]" <[email protected]> 
>Sent: Wednesday, June 12, 2013 10:35 AM
>Subject: Re: ElasticSearchSink does not work
>  
>
>
>Allan, thanks for the reply. In my case, I only used one channel and one sink 
>at the same time.
> 
>About 10 minutes after the data were sent to the Flume agent, some messages 
>were logged in flume.log (see below). It says class 
>org/elasticsearch/common/transport/TransportAddress was not found. This seems 
>indicating that the Cloudera version of Flume does not support 
>ElasticSearchSink. Anyway to add the missing class or some jar file?
> 
>I also tried to download the flume from Flume site:
> 
>http://flume.apache.org/download.html
>http://www.apache.org/dyn/closer.cgi/flume/1.3.1/apache-flume-1.3.1-bin.tar.gz
> 
>But the downloaded apache-flume-1.3.1-bin.tar.gz is complained as not a gzip 
>file nor a tar file on my Linux box (Red Hat 5). Can anyone let me know the 
>exact downloading process? If possible, please provide some step-by-step 
>instruction for downloading and installation.
> 
>Thanks.
> 
>Shushuai
> 
> 
>-------------------------------------------------------------------------------------------------------------
>11 Jun 2013 19:40:37,082 INFO  [lifecycleSupervisor-1-0] 
>(org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start:61)  - 
>Configuration provider starting
>11 Jun 2013 19:40:37,114 INFO  [conf-file-poller-0] 
>(org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:133)
>  - Reloading configuration file:conf/flume.conf
>11 Jun 2013 19:40:37,121 INFO  [conf-file-poller-0] 
>(org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) 
> - Processing:k1
>11 Jun 2013 19:40:37,122 INFO  [conf-file-poller-0] 
>(org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) 
> - Processing:k1
>11 Jun 2013 19:40:37,122 INFO  [conf-file-poller-0] 
>(org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) 
> - Processing:k1
>11 Jun 2013 19:40:37,122 INFO  [conf-file-poller-0] 
>(org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) 
> -
 Processing:k1
>11 Jun 2013 19:40:37,122 INFO  [conf-file-poller-0] 
>(org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) 
> - Processing:k1
>11 Jun 2013 19:40:37,122 INFO  [conf-file-poller-0] 
>(org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:930)  
>- Added sinks: k1 Agent: agent1
>11 Jun 2013 19:40:37,122 INFO  [conf-file-poller-0] 
>(org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) 
> - Processing:k1
>11 Jun 2013 19:40:37,123 INFO  [conf-file-poller-0] 
>(org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) 
> - Processing:k1
>11 Jun 2013 19:40:37,123 INFO  [conf-file-poller-0] 
>(org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) 
> - Processing:k1
>11 Jun 2013 19:40:37,457 INFO  [conf-file-poller-0]
 (org.apache.flume.conf.FlumeConfiguration.validateConfiguration:140)  - 
Post-validation flume configuration contains configuration for agents: [agent1]
>11 Jun 2013 19:40:37,457 INFO  [conf-file-poller-0] 
>(org.apache.flume.node.AbstractConfigurationProvider.loadChannels:150)  - 
>Creating channels
>11 Jun 2013 19:40:37,464 INFO  [conf-file-poller-0] 
>(org.apache.flume.channel.DefaultChannelFactory.create:40)  - Creating 
>instance of channel ch1 type memory
>11 Jun 2013 19:40:37,468 INFO  [conf-file-poller-0] 
>(org.apache.flume.node.AbstractConfigurationProvider.loadChannels:205)  - 
>Created channel ch1
>11 Jun 2013 19:40:37,469 INFO  [conf-file-poller-0] 
>(org.apache.flume.source.DefaultSourceFactory.create:39)  - Creating instance 
>of source avro-source1, type avro
>11 Jun 2013 19:40:37,484 INFO  [conf-file-poller-0] 
>(org.apache.flume.sink.DefaultSinkFactory.create:40)  - Creating instance of
 sink: k1, type: org.apache.flume.sink.elasticsearch.ElasticSearchSink
>11 Jun 2013 19:40:37,489 ERROR [conf-file-poller-0] 
>(org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:145)
>  - Failed to start agent because dependencies were not found in classpath. 
>Error follows.
>java.lang.NoClassDefFoundError: 
>org/elasticsearch/common/transport/TransportAddress
>        at java.lang.Class.forName0(Native Method)
>        at java.lang.Class.forName(Class.java:186)
>        at 
>org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:67)
>        at 
>org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:41)
>        at
 
org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:415)
>        at 
>org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103)
>        at 
>org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
>        at 
>java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>        at 
>java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
>        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
>        at
 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:165)
>        at 
>java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267)
>        at 
>java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>        at 
>java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>        at java.lang.Thread.run(Thread.java:679)
>Caused by: java.lang.ClassNotFoundException: 
>org.elasticsearch.common.transport.TransportAddress
>        at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
>        at java.security.AccessController.doPrivileged(Native
 Method)
>        at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
>        at java.lang.ClassLoader.loadClass(ClassLoader.java:321)
>        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
>        at java.lang.ClassLoader.loadClass(ClassLoader.java:266)
>        ... 15 more
>----------------------------------------------------------------------------------------------------------
>
> 
>
>________________________________
> From: Allan Feid <[email protected]>
>To: [email protected]; shushuai zhu <[email protected]> 
>Sent: Wednesday, June 12, 2013 10:09 AM
>Subject: Re: ElasticSearchSink does not work
>
>Hi Shushuai,
>
>I've had a similar issue, and in my case it was because I was using the same 
>channel for multiple sinks. I believe what happens is whatever sink can remove 
>the event from the queue first will have it written out, but I don't know the 
>specifics since I haven't had a chance to read through the codebase. If you 
>add a second channel for your elasticsearch sink and make sure your
 avro-source writes to both channels, you should see data going to both 
locations. 
>
>Thanks,
>Allan
>
>
>On Tue, Jun 11, 2013 at 10:37 PM, shushuai zhu <[email protected]> wrote:
>
>Hi,
>>
>>I am new to Flume. I am trying to send data using Flume Client perl API to 
>>Flume Avro source then ElasticSearchSink to an ElasticSearch engine. I could 
>>make the file_roll sink to work by sending the data to some file. However, I 
>>am encountering issue with ElasticSearchSink. The data did not go through to 
>>ElasticSearch engine: 
>>
>>use Flume::Client;
>>my $ng_client = Flume::Client::Transceiver::Socket->new(host => 'host name', 
>>port => 41414);
>>my $ng_requestor = Flume::Client::Requestor::FlumeNG->new(client => 
>>$ng_client);
>>my ($result, $response) = $ng_requestor->request('appendBatch', [{ headers => 
>>{}, body => "hello, this is sent from perl (using FlumeNG)"}]);
>>print "$response\n";    # response will be 'OK'
 on success
>>
>>since the returned $response is not defined (again this worked when file_roll 
>>sink was used).
>>
>>The ElasticSearch engine is working since I could load data to it via 
>>LogStash's EalsticSearch output type. 
>>
>>The Flume agent was installed by downloading tarball from Cloudera:
>>
>>http://archive.cloudera.com/cdh4/cdh/4/flume-ng-1.3.0-cdh4.3.0.tar.gz 
>>
>>The flume.conf is as followings. I played around the "hostNames" with full 
>>name, IP address, etc. Do not see output message in flume.log. Could someone 
>>let me know what could potentially cause the issue? 
>>
>>Thanks.
>> 
>>Shushuai
>>
>>
>>
>># Define a memory channel called ch1 on agent1
>>agent1.channels = ch1
>>agent1.channels.ch1.type = memory
>>
>># Define an Avro source called avro-source1 on agent1 and tell it to bind to 
>>0.0.0.0:41414. Connect it to channel ch1.
>>agent1.sources = avro-source1
>>agent1.sources.avro-source1.channels = ch1
>>agent1.sources.avro-source1.type = avro
>>agent1.sources.avro-source1.bind = 0.0.0.0
>>agent1.sources.avro-source1.port = 41414
>>
>># Define a local file sink that simply logs all events it receives (this 
>>works well)
>>#agent1.sinks = localout
>>#agent1.sinks.localout.type = file_roll
>>#agent1.sinks.localout.sink.directory = /scratch/flume-ng/log
>>#agent1.sinks.localout.sink.rollInterval = 0
>>#agent1.sinks.localout.channel = ch1
>>
>># Define ElasticSearchSink sink (this does not work)
>>agent1.sinks = k1
>>agent1.sinks.k1.type =
 org.apache.flume.sink.elasticsearch.ElasticSearchSink
>>agent1.sinks.k1.hostNames = localhost:9300
>>agent1.sinks.k1.indexName = flume
>>agent1.sinks.k1.indexType = logs
>>agent1.sinks.k1.clusterName = elasticsearch
>>agent1.sinks.k1.batchSize = 2
>>agent1.sinks.k1.serializer = 
>>org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
>>agent1.sinks.k1.channel = ch1
>>
>
>
>   
>
>   

Reply via email to