Hi,
I am new to Flume. I am trying to send data using Flume Client perl API to
Flume Avro source then ElasticSearchSink to an ElasticSearch engine. I could
make the file_roll sink to work by sending the data to some file. However, I am
encountering issue with ElasticSearchSink. The data did not go through to
ElasticSearch engine:
use Flume::Client;
my $ng_client = Flume::Client::Transceiver::Socket->new(host => 'host name',
port => 41414);
my $ng_requestor = Flume::Client::Requestor::FlumeNG->new(client => $ng_client);
my ($result, $response) = $ng_requestor->request('appendBatch', [{ headers =>
{}, body => "hello, this is sent from perl (using FlumeNG)"}]);
print "$response\n"; # response will be 'OK' on success
since the returned $response is not defined (again this worked when file_roll
sink was used).
The ElasticSearch engine is working since I could load data to it via
LogStash's EalsticSearch output type.
The Flume agent was installed by downloading tarball from Cloudera:
http://archive.cloudera.com/cdh4/cdh/4/flume-ng-1.3.0-cdh4.3.0.tar.gz
The flume.conf is as followings. I played around the "hostNames" with full
name, IP address, etc. Do not see output message in flume.log. Could someone
let me know what could potentially cause the issue?
Thanks.
Shushuai
# Define a memory channel called ch1 on agent1
agent1.channels = ch1
agent1.channels.ch1.type = memory
# Define an Avro source called avro-source1 on agent1 and tell it to bind to
0.0.0.0:41414. Connect it to channel ch1.
agent1.sources = avro-source1
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind = 0.0.0.0
agent1.sources.avro-source1.port = 41414
# Define a local file sink that simply logs all events it receives (this works
well)
#agent1.sinks = localout
#agent1.sinks.localout.type = file_roll
#agent1.sinks.localout.sink.directory = /scratch/flume-ng/log
#agent1.sinks.localout.sink.rollInterval = 0
#agent1.sinks.localout.channel = ch1
# Define ElasticSearchSink sink (this does not work)
agent1.sinks = k1
agent1.sinks.k1.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
agent1.sinks.k1.hostNames = localhost:9300
agent1.sinks.k1.indexName = flume
agent1.sinks.k1.indexType = logs
agent1.sinks.k1.clusterName = elasticsearch
agent1.sinks.k1.batchSize = 2
agent1.sinks.k1.serializer =
org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
agent1.sinks.k1.channel = ch1