On Thu, Dec 27, 2012 at 12:22 PM, Priyanka Jain <[email protected]>wrote:
> Hi , > > Sending the some log for the error.In attached file. > > On Wed, Dec 26, 2012 at 11:50 PM, Nitin Pawar <[email protected]>wrote: > >> can you provide sample line out of your log? >> >> the cassandra sink you are using looks at a particular event headers in >> your log such as key, src and host >> >> >> On Wed, Dec 26, 2012 at 11:35 PM, Priyanka jain < >> [email protected]> wrote: >> >>> Hi Nitin, >>> Thanks for your suggestion. >>> >>> I have done all the thing as in README. but am not getting from where I >>> can set that key. >>> can you please give me idea about from where I can configure it or from >>> where its get generated. >>> >>> >>> On Wed, Dec 26, 2012 at 11:22 PM, Nitin Pawar >>> <[email protected]>wrote: >>> >>>> from the README >>>> you need to have following things in conf >>>> >>>> The Sink expects several flume event headers to be present: >>>> >>>> - key - used (combined with src) to create the Cassandra row key. >>>> It should be generated by the application doing the logging >>>> - timestamp - timestamp of when the log occurred, not necessarily >>>> when the flume event is created >>>> - src - A logical source of the flume event. Could be host, but >>>> probably you will have many hosts for a source. A more likely candidate >>>> for >>>> source is the name of the application >>>> - host - the name of the host where the message was generated >>>> - >>>> >>>> >>>> >>>> On Wed, Dec 26, 2012 at 11:09 PM, Priyanka jain < >>>> [email protected]> wrote: >>>> >>>>> HI, >>>>> >>>>> I am working on the POC of Cassandra flume integration. For that am >>>>> using Cassandra sink plugin from *Github (flume-ng Cassandra sink >>>>> plugin).* >>>>> *And * >>>>> *Flume-NG version-1.2.0* >>>>> *Apache Cassandra Version :1.1.5* >>>>> *I *have build the jar using maven and am using sink configuration as >>>>> below in flume.conf.cassandra in conf directory... >>>>> >>>>> *agent.sources = avrosource* >>>>> >>>>> *agent.channels = channel1* >>>>> >>>>> *agent.sinks = cassandraSink* >>>>> >>>>> * * >>>>> >>>>> *#source defination* >>>>> >>>>> *agent.sources.avrosource.channels = channel1* >>>>> >>>>> *agent.sources.avrosource.type = exec* >>>>> >>>>> *agent.sources.avrosource.command = tail -f >>>>> /home/user/priyanka/flume-ng/flnginput.txt* >>>>> >>>>> * * >>>>> >>>>> *#agent.sources.avrosource.type = avro* >>>>> >>>>> *#agent.sources.avrosource.channels = channel1* >>>>> >>>>> *#agent.sources.avrosource.bind =127.0.0.1* >>>>> >>>>> *#agent.sources.avrosource.port =41414* >>>>> >>>>> * * >>>>> >>>>> *#Flume header event* >>>>> >>>>> *agent.sources.avrosource.interceptors = addHost* >>>>> >>>>> *agent.sources.avrosource.interceptors.addHost.type = >>>>> org.apache.flume.interceptor.HostInterceptor$Builder* >>>>> >>>>> *agent.sources.avrosource.interceptors.addHost.preserveExisting = >>>>> false* >>>>> >>>>> *agent.sources.avrosource.interceptors.addHost.useIP = false* >>>>> >>>>> *agent.sources.avrosource.interceptors.addHost.hostHeader = host* >>>>> >>>>> *agent.sources.avrosource.interceptors = addTimestamp* >>>>> >>>>> *agent.sources.avrosource.interceptors.addTimestamp.type = >>>>> org.apache.flume.interceptor.TimestampInterceptor$Builder* >>>>> >>>>> * * >>>>> >>>>> *# Cassandra flow* >>>>> >>>>> *agent.channels.channel1.type = FILE* >>>>> >>>>> *agent.channels.channel1.checkpointDir = file-channel1/check* >>>>> >>>>> *agent.channels.channel1.dataDirs = file-channel1/data* >>>>> >>>>> * * >>>>> >>>>> *agent.sinks.cassandraSink.channel = channel1* >>>>> >>>>> *agent.sinks.cassandraSink.type = >>>>> com.btoddb.flume.sinks.cassandra.CassandraSink* >>>>> >>>>> *agent.sinks.cassandraSink.hosts = localhost* >>>>> >>>>> *agent.sinks.cassandraSink.port = 9160* >>>>> >>>>> *agent.sinks.cassandraSink.keyspace-name = logs* >>>>> >>>>> *agent.sinks.cassandraSink.records-colfam = records* >>>>> >>>>> * >>>>> * >>>>> >>>>> * >>>>> * >>>>> >>>>> * >>>>> * >>>>> >>>>> *Am running this using the command :-* >>>>> >>>>> * >>>>> * >>>>> >>>>> flume-ng agent -n agent -c /usr/lib/flume-ng-1.2/conf/ -f >>>>> /usr/lib/flume-ng-1.2/conf/flume.conf.cassandra >>>>> -Dflume.root.logger=DEBUG,console >>>>> >>>>> >>>>> Got the error while running :- >>>>> >>>>> * * >>>>> >>>>> 2012-12-21 14:37:07,743 >>>>> (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - >>>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:193)] >>>>> exception while processing in Cassandra Sink >>>>> >>>>> java.lang.IllegalArgumentException: Missing flume header attribute, >>>>> 'key' - cannot process this event >>>>> >>>>> at >>>>> com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125) >>>>> >>>>> at >>>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166) >>>>> >>>>> at >>>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) >>>>> >>>>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) >>>>> >>>>> atjava.lang.Thread.run(Thread.java:722) >>>>> >>>>> 2012-12-21 14:37:07,743 >>>>> (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - >>>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable >>>>> to deliver event. Exception follows. >>>>> >>>>> org.apache.flume.EventDeliveryException: Failed to persist message >>>>> >>>>> at >>>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:194) >>>>> >>>>> at >>>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) >>>>> >>>>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) >>>>> >>>>> atjava.lang.Thread.run(Thread.java:722) >>>>> >>>>> Caused by: java.lang.IllegalArgumentException: Missing flume header >>>>> attribute, 'key' - cannot process this event >>>>> >>>>> at >>>>> com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125) >>>>> >>>>> at >>>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166) >>>>> >>>>> ... 3 more >>>>> >>>>> I got one solution as Key is Src+Key but am not getting how to >>>>> configure it. >>>>> So can any one please help me out to solve this problem. >>>>> So >>>>> >>>>> >>>> >>>> >>>> -- >>>> Nitin Pawar >>>> >>> >>> >> >> >> -- >> Nitin Pawar >> > >
