from the README you need to have following things in conf The Sink expects several flume event headers to be present:
- key - used (combined with src) to create the Cassandra row key. It should be generated by the application doing the logging - timestamp - timestamp of when the log occurred, not necessarily when the flume event is created - src - A logical source of the flume event. Could be host, but probably you will have many hosts for a source. A more likely candidate for source is the name of the application - host - the name of the host where the message was generated - On Wed, Dec 26, 2012 at 11:09 PM, Priyanka jain <[email protected]>wrote: > HI, > > I am working on the POC of Cassandra flume integration. For that am > using Cassandra sink plugin from *Github (flume-ng Cassandra sink plugin). > * > *And * > *Flume-NG version-1.2.0* > *Apache Cassandra Version :1.1.5* > *I *have build the jar using maven and am using sink configuration as > below in flume.conf.cassandra in conf directory... > > *agent.sources = avrosource* > > *agent.channels = channel1* > > *agent.sinks = cassandraSink* > > * * > > *#source defination* > > *agent.sources.avrosource.channels = channel1* > > *agent.sources.avrosource.type = exec* > > *agent.sources.avrosource.command = tail -f > /home/user/priyanka/flume-ng/flnginput.txt* > > * * > > *#agent.sources.avrosource.type = avro* > > *#agent.sources.avrosource.channels = channel1* > > *#agent.sources.avrosource.bind =127.0.0.1* > > *#agent.sources.avrosource.port =41414* > > * * > > *#Flume header event* > > *agent.sources.avrosource.interceptors = addHost* > > *agent.sources.avrosource.interceptors.addHost.type = > org.apache.flume.interceptor.HostInterceptor$Builder* > > *agent.sources.avrosource.interceptors.addHost.preserveExisting = false* > > *agent.sources.avrosource.interceptors.addHost.useIP = false* > > *agent.sources.avrosource.interceptors.addHost.hostHeader = host* > > *agent.sources.avrosource.interceptors = addTimestamp* > > *agent.sources.avrosource.interceptors.addTimestamp.type = > org.apache.flume.interceptor.TimestampInterceptor$Builder* > > * * > > *# Cassandra flow* > > *agent.channels.channel1.type = FILE* > > *agent.channels.channel1.checkpointDir = file-channel1/check* > > *agent.channels.channel1.dataDirs = file-channel1/data* > > * * > > *agent.sinks.cassandraSink.channel = channel1* > > *agent.sinks.cassandraSink.type = > com.btoddb.flume.sinks.cassandra.CassandraSink* > > *agent.sinks.cassandraSink.hosts = localhost* > > *agent.sinks.cassandraSink.port = 9160* > > *agent.sinks.cassandraSink.keyspace-name = logs* > > *agent.sinks.cassandraSink.records-colfam = records* > > * > * > > * > * > > * > * > > *Am running this using the command :-* > > * > * > > flume-ng agent -n agent -c /usr/lib/flume-ng-1.2/conf/ -f > /usr/lib/flume-ng-1.2/conf/flume.conf.cassandra > -Dflume.root.logger=DEBUG,console > > > Got the error while running :- > > * * > > 2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor) > [ERROR - > com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:193)] > exception while processing in Cassandra Sink > > java.lang.IllegalArgumentException: Missing flume header attribute, 'key' > - cannot process this event > > at > com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125) > > at > com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166) > > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) > > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) > > atjava.lang.Thread.run(Thread.java:722) > > 2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor) > [ERROR - > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable > to deliver event. Exception follows. > > org.apache.flume.EventDeliveryException: Failed to persist message > > at > com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:194) > > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) > > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) > > atjava.lang.Thread.run(Thread.java:722) > > Caused by: java.lang.IllegalArgumentException: Missing flume header > attribute, 'key' - cannot process this event > > at > com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125) > > at > com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166) > > ... 3 more > > I got one solution as Key is Src+Key but am not getting how to configure > it. > So can any one please help me out to solve this problem. > So > > -- Nitin Pawar
