Hi Shushuai, I've had a similar issue, and in my case it was because I was using the same channel for multiple sinks. I believe what happens is whatever sink can remove the event from the queue first will have it written out, but I don't know the specifics since I haven't had a chance to read through the codebase. If you add a second channel for your elasticsearch sink and make sure your avro-source writes to both channels, you should see data going to both locations.
Thanks, Allan On Tue, Jun 11, 2013 at 10:37 PM, shushuai zhu <[email protected]> wrote: > Hi, > > I am new to Flume. I am trying to send data using Flume Client perl API to > Flume Avro source then ElasticSearchSink to an ElasticSearch engine. I > could make the file_roll sink to work by sending the data to some file. > However, I am encountering issue with ElasticSearchSink. The data did not > go through to ElasticSearch engine: > > use Flume::Client; > my $ng_client = Flume::Client::Transceiver::Socket->new(host => 'host > name', port => 41414); > my $ng_requestor = Flume::Client::Requestor::FlumeNG->new(client => > $ng_client); > my ($result, $response) = $ng_requestor->request('appendBatch', [{ headers > => {}, body => "hello, this is sent from perl (using FlumeNG)"}]); > print "$response\n"; # response will be 'OK' on success > > since the returned $response is not defined (again this worked when > file_roll sink was used). > > The ElasticSearch engine is working since I could load data to it via > LogStash's EalsticSearch output type. > > The Flume agent was installed by downloading tarball from Cloudera: > > http://archive.cloudera.com/cdh4/cdh/4/flume-ng-1.3.0-cdh4.3.0.tar.gz > > The flume.conf is as followings. I played around the "hostNames" with full > name, IP address, etc. Do not see output message in flume.log. Could > someone let me know what could potentially cause the issue? > > Thanks. > > Shushuai > > > > # Define a memory channel called ch1 on agent1 > agent1.channels = ch1 > agent1.channels.ch1.type = memory > > # Define an Avro source called avro-source1 on agent1 and tell it to bind > to 0.0.0.0:41414. Connect it to channel ch1. > agent1.sources = avro-source1 > agent1.sources.avro-source1.channels = ch1 > agent1.sources.avro-source1.type = avro > agent1.sources.avro-source1.bind = 0.0.0.0 > agent1.sources.avro-source1.port = 41414 > > # Define a local file sink that simply logs all events it receives (this > works well) > #agent1.sinks = localout > #agent1.sinks.localout.type = file_roll > #agent1.sinks.localout.sink.directory = /scratch/flume-ng/log > #agent1.sinks.localout.sink.rollInterval = 0 > #agent1.sinks.localout.channel = ch1 > > # Define ElasticSearchSink sink (this does not work) > agent1.sinks = k1 > agent1.sinks.k1.type = > org.apache.flume.sink.elasticsearch.ElasticSearchSink > agent1.sinks.k1.hostNames = localhost:9300 > agent1.sinks.k1.indexName = flume > agent1.sinks.k1.indexType = logs > agent1.sinks.k1.clusterName = elasticsearch > agent1.sinks.k1.batchSize = 2 > agent1.sinks.k1.serializer = > org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer > agent1.sinks.k1.channel = ch1 >
