Hi Nitin, Thanks for your suggestion. I have done all the thing as in README. but am not getting from where I can set that key. can you please give me idea about from where I can configure it or from where its get generated.
On Wed, Dec 26, 2012 at 11:22 PM, Nitin Pawar <[email protected]>wrote: > from the README > you need to have following things in conf > > The Sink expects several flume event headers to be present: > > - key - used (combined with src) to create the Cassandra row key. It > should be generated by the application doing the logging > - timestamp - timestamp of when the log occurred, not necessarily when > the flume event is created > - src - A logical source of the flume event. Could be host, but > probably you will have many hosts for a source. A more likely candidate for > source is the name of the application > - host - the name of the host where the message was generated > - > > > > On Wed, Dec 26, 2012 at 11:09 PM, Priyanka jain < > [email protected]> wrote: > >> HI, >> >> I am working on the POC of Cassandra flume integration. For that am >> using Cassandra sink plugin from *Github (flume-ng Cassandra sink >> plugin).* >> *And * >> *Flume-NG version-1.2.0* >> *Apache Cassandra Version :1.1.5* >> *I *have build the jar using maven and am using sink configuration as >> below in flume.conf.cassandra in conf directory... >> >> *agent.sources = avrosource* >> >> *agent.channels = channel1* >> >> *agent.sinks = cassandraSink* >> >> * * >> >> *#source defination* >> >> *agent.sources.avrosource.channels = channel1* >> >> *agent.sources.avrosource.type = exec* >> >> *agent.sources.avrosource.command = tail -f >> /home/user/priyanka/flume-ng/flnginput.txt* >> >> * * >> >> *#agent.sources.avrosource.type = avro* >> >> *#agent.sources.avrosource.channels = channel1* >> >> *#agent.sources.avrosource.bind =127.0.0.1* >> >> *#agent.sources.avrosource.port =41414* >> >> * * >> >> *#Flume header event* >> >> *agent.sources.avrosource.interceptors = addHost* >> >> *agent.sources.avrosource.interceptors.addHost.type = >> org.apache.flume.interceptor.HostInterceptor$Builder* >> >> *agent.sources.avrosource.interceptors.addHost.preserveExisting = false* >> >> *agent.sources.avrosource.interceptors.addHost.useIP = false* >> >> *agent.sources.avrosource.interceptors.addHost.hostHeader = host* >> >> *agent.sources.avrosource.interceptors = addTimestamp* >> >> *agent.sources.avrosource.interceptors.addTimestamp.type = >> org.apache.flume.interceptor.TimestampInterceptor$Builder* >> >> * * >> >> *# Cassandra flow* >> >> *agent.channels.channel1.type = FILE* >> >> *agent.channels.channel1.checkpointDir = file-channel1/check* >> >> *agent.channels.channel1.dataDirs = file-channel1/data* >> >> * * >> >> *agent.sinks.cassandraSink.channel = channel1* >> >> *agent.sinks.cassandraSink.type = >> com.btoddb.flume.sinks.cassandra.CassandraSink* >> >> *agent.sinks.cassandraSink.hosts = localhost* >> >> *agent.sinks.cassandraSink.port = 9160* >> >> *agent.sinks.cassandraSink.keyspace-name = logs* >> >> *agent.sinks.cassandraSink.records-colfam = records* >> >> * >> * >> >> * >> * >> >> * >> * >> >> *Am running this using the command :-* >> >> * >> * >> >> flume-ng agent -n agent -c /usr/lib/flume-ng-1.2/conf/ -f >> /usr/lib/flume-ng-1.2/conf/flume.conf.cassandra >> -Dflume.root.logger=DEBUG,console >> >> >> Got the error while running :- >> >> * * >> >> 2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor) >> [ERROR - >> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:193)] >> exception while processing in Cassandra Sink >> >> java.lang.IllegalArgumentException: Missing flume header attribute, 'key' >> - cannot process this event >> >> at >> com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125) >> >> at >> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166) >> >> at >> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) >> >> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) >> >> atjava.lang.Thread.run(Thread.java:722) >> >> 2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor) >> [ERROR - >> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable >> to deliver event. Exception follows. >> >> org.apache.flume.EventDeliveryException: Failed to persist message >> >> at >> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:194) >> >> at >> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) >> >> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) >> >> atjava.lang.Thread.run(Thread.java:722) >> >> Caused by: java.lang.IllegalArgumentException: Missing flume header >> attribute, 'key' - cannot process this event >> >> at >> com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125) >> >> at >> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166) >> >> ... 3 more >> >> I got one solution as Key is Src+Key but am not getting how to configure >> it. >> So can any one please help me out to solve this problem. >> So >> >> > > > -- > Nitin Pawar >
