can you provide sample line out of your log? the cassandra sink you are using looks at a particular event headers in your log such as key, src and host
On Wed, Dec 26, 2012 at 11:35 PM, Priyanka jain <[email protected]>wrote: > Hi Nitin, > Thanks for your suggestion. > > I have done all the thing as in README. but am not getting from where I > can set that key. > can you please give me idea about from where I can configure it or from > where its get generated. > > > On Wed, Dec 26, 2012 at 11:22 PM, Nitin Pawar <[email protected]>wrote: > >> from the README >> you need to have following things in conf >> >> The Sink expects several flume event headers to be present: >> >> - key - used (combined with src) to create the Cassandra row key. It >> should be generated by the application doing the logging >> - timestamp - timestamp of when the log occurred, not necessarily >> when the flume event is created >> - src - A logical source of the flume event. Could be host, but >> probably you will have many hosts for a source. A more likely candidate >> for >> source is the name of the application >> - host - the name of the host where the message was generated >> - >> >> >> >> On Wed, Dec 26, 2012 at 11:09 PM, Priyanka jain < >> [email protected]> wrote: >> >>> HI, >>> >>> I am working on the POC of Cassandra flume integration. For that am >>> using Cassandra sink plugin from *Github (flume-ng Cassandra sink >>> plugin).* >>> *And * >>> *Flume-NG version-1.2.0* >>> *Apache Cassandra Version :1.1.5* >>> *I *have build the jar using maven and am using sink configuration as >>> below in flume.conf.cassandra in conf directory... >>> >>> *agent.sources = avrosource* >>> >>> *agent.channels = channel1* >>> >>> *agent.sinks = cassandraSink* >>> >>> * * >>> >>> *#source defination* >>> >>> *agent.sources.avrosource.channels = channel1* >>> >>> *agent.sources.avrosource.type = exec* >>> >>> *agent.sources.avrosource.command = tail -f >>> /home/user/priyanka/flume-ng/flnginput.txt* >>> >>> * * >>> >>> *#agent.sources.avrosource.type = avro* >>> >>> *#agent.sources.avrosource.channels = channel1* >>> >>> *#agent.sources.avrosource.bind =127.0.0.1* >>> >>> *#agent.sources.avrosource.port =41414* >>> >>> * * >>> >>> *#Flume header event* >>> >>> *agent.sources.avrosource.interceptors = addHost* >>> >>> *agent.sources.avrosource.interceptors.addHost.type = >>> org.apache.flume.interceptor.HostInterceptor$Builder* >>> >>> *agent.sources.avrosource.interceptors.addHost.preserveExisting = false* >>> >>> *agent.sources.avrosource.interceptors.addHost.useIP = false* >>> >>> *agent.sources.avrosource.interceptors.addHost.hostHeader = host* >>> >>> *agent.sources.avrosource.interceptors = addTimestamp* >>> >>> *agent.sources.avrosource.interceptors.addTimestamp.type = >>> org.apache.flume.interceptor.TimestampInterceptor$Builder* >>> >>> * * >>> >>> *# Cassandra flow* >>> >>> *agent.channels.channel1.type = FILE* >>> >>> *agent.channels.channel1.checkpointDir = file-channel1/check* >>> >>> *agent.channels.channel1.dataDirs = file-channel1/data* >>> >>> * * >>> >>> *agent.sinks.cassandraSink.channel = channel1* >>> >>> *agent.sinks.cassandraSink.type = >>> com.btoddb.flume.sinks.cassandra.CassandraSink* >>> >>> *agent.sinks.cassandraSink.hosts = localhost* >>> >>> *agent.sinks.cassandraSink.port = 9160* >>> >>> *agent.sinks.cassandraSink.keyspace-name = logs* >>> >>> *agent.sinks.cassandraSink.records-colfam = records* >>> >>> * >>> * >>> >>> * >>> * >>> >>> * >>> * >>> >>> *Am running this using the command :-* >>> >>> * >>> * >>> >>> flume-ng agent -n agent -c /usr/lib/flume-ng-1.2/conf/ -f >>> /usr/lib/flume-ng-1.2/conf/flume.conf.cassandra >>> -Dflume.root.logger=DEBUG,console >>> >>> >>> Got the error while running :- >>> >>> * * >>> >>> 2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor) >>> [ERROR - >>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:193)] >>> exception while processing in Cassandra Sink >>> >>> java.lang.IllegalArgumentException: Missing flume header attribute, >>> 'key' - cannot process this event >>> >>> at >>> com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125) >>> >>> at >>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166) >>> >>> at >>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) >>> >>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) >>> >>> atjava.lang.Thread.run(Thread.java:722) >>> >>> 2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor) >>> [ERROR - >>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable >>> to deliver event. Exception follows. >>> >>> org.apache.flume.EventDeliveryException: Failed to persist message >>> >>> at >>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:194) >>> >>> at >>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) >>> >>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) >>> >>> atjava.lang.Thread.run(Thread.java:722) >>> >>> Caused by: java.lang.IllegalArgumentException: Missing flume header >>> attribute, 'key' - cannot process this event >>> >>> at >>> com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125) >>> >>> at >>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166) >>> >>> ... 3 more >>> >>> I got one solution as Key is Src+Key but am not getting how to configure >>> it. >>> So can any one please help me out to solve this problem. >>> So >>> >>> >> >> >> -- >> Nitin Pawar >> > > -- Nitin Pawar
