Can you try setting this config param for your HDFS Sink:  
hdfs.useLocalTimeStamp = true 

This should insert the timestamp at the sink into the event (this may not be 
what you want - but this will get rid of the event from the channel). 


Thanks,
Hari


On Wednesday, August 7, 2013 at 7:14 AM, Jonathan Cooper-Ellis wrote:

> You can use a Static Interceptor before the RegexExtractor to add a timestamp 
> of zero to the header, which can then be overwritten by the proper timestamp 
> (if it exists). It also should sink misses into an obvious 'miss' directory.
> 
> 
> On Tue, Aug 6, 2013 at 10:40 PM, Anat Rozenzon <[email protected] 
> (mailto:[email protected])> wrote:
> > After some reading in the docs I think the existing fail-over behavior 
> > can't be used to solve the 'poison' message problem as it put the 'failed' 
> > sink in  a 'cooldown' period.
> > As the problem is in the message and not the sink, it means that after a 
> > poison message had arrived, the HDFS sink will 'fail' and thus next X 
> > messages will go to the failover sink.
> > My only solution for now is to avoid my current problem and hope that I 
> > won't have any other problematic messages, I'll be glad to have a less 
> > fragile solution.
> > 
> > Many thanks!
> > Other than that, Flume looks like a great tool :-)
> > 
> > Anat
> > 
> > 
> > On Sun, Aug 4, 2013 at 8:45 AM, Anat Rozenzon <[email protected] 
> > (mailto:[email protected])> wrote:
> > > I think using a fail-over processor is a very good idea, I think I'll use 
> > > it as an immediate solution.
> > > For the long run, I would like to see a general solution (not specific to 
> > > file channel, in my case it is an HDFS channel), so the suggestion to add 
> > > 'Poison Message' sink to the sink processor sound good. 
> > > 
> > > Just FYI, my problem is that a log file going through my source did not 
> > > have (in all rows) the structure I expected.
> > > 
> > > Since I used regexp extractor to put timestamp, the 'bad' row didn't 
> > > match the regexp and the timestamp was not set, then the HDFS sink throws 
> > > NPE on that:
> > > 01 Aug 2013 09:36:24,259 ERROR 
> > > [SinkRunner-PollingRunner-DefaultSinkProcessor] 
> > > (org.apache.flume.sink.hdfs.HDFSEventSink.process:422)  - process failed
> > > java.lang.NullPointerException: Expected timestamp in the Flume event 
> > > headers, but it was null
> > >         at 
> > > com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
> > >         at 
> > > org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:200)
> > >         at 
> > > org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:396)
> > >         at 
> > > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356)
> > >         at 
> > > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
> > >         at 
> > > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
> > >         at java.lang.Thread.run(Thread.java:722)
> > > 01 Aug 2013 09:36:24,262 ERROR 
> > > [SinkRunner-PollingRunner-DefaultSinkProcessor] 
> > > (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver 
> > > event. Exception follows.
> > > org.apache.flume.EventDeliveryException: java.lang.NullPointerException: 
> > > Expected timestamp in the Flume event headers, but it was null
> > >         at 
> > > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:426)
> > >         at 
> > > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
> > >         at 
> > > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
> > >         at java.lang.Thread.run(Thread.java:722)
> > > Caused by: java.lang.NullPointerException: Expected timestamp in the 
> > > Flume event headers, but it was null
> > >         at 
> > > com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
> > >         at 
> > > org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:200)
> > >         at 
> > > org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:396)
> > >         at 
> > > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356)
> > >         ... 3 more
> > > 
> > > 
> > > I fixed my regexp now, still, I can never be sure all the log files the 
> > > system creates will be perfect without any bad lines.
> > > 
> > > 
> > > On Sat, Aug 3, 2013 at 9:56 AM, Connor Woodson <[email protected] 
> > > (mailto:[email protected])> wrote:
> > > > Just some more thoughts. It could be even easier:
> > > > 
> > > > The whole set up might be even easier; for the failover sink processor, 
> > > > you have those settings "max attempts" and "time between attempts" and 
> > > > it will just try one event X times before it gives up and sends it to 
> > > > the next sink. The time between events could even backoff if needed. 
> > > > 
> > > > The advantage of this is that it preserves the ordering of events, 
> > > > something which gets completely broken in the previous scenario.
> > > > 
> > > > - Connor
> > > > 
> > > > 
> > > > On Fri, Aug 2, 2013 at 6:27 PM, Connor Woodson <[email protected] 
> > > > (mailto:[email protected])> wrote:
> > > > > As another option to solve the problem of having a bad event in a 
> > > > > channel: using a fail-over sink processor, log all bad events to a 
> > > > > local file. And to be extra cautious, add a third failover of a null 
> > > > > sink. This will mean that events will always flow through your 
> > > > > channel. The file sink should almost never fail, so you shouldn't be 
> > > > > losing events in the process. And then you can re-process everything 
> > > > > in the file if you still want those events for something. 
> > > > > 
> > > > > For the system of having Flume detect bad events, I think 
> > > > > implementing something like above is better than discarding events 
> > > > > that fail X times. For instance, if you have an Avro sink -> Avro 
> > > > > source, and you're restarting your source, Flume would end up 
> > > > > discarding events unnecessarily. Instead, how about implementing the 
> > > > > above system and then go a step further: Flume will attempt to 
> > > > > re-send the bad events itself. And then if a bad event isn't able to 
> > > > > be sent after X attempts, it is can be discarded. 
> > > > > 
> > > > > I envision this system as an extension to the current File Channel; 
> > > > > when an event fails, it is written to a secondary File Channel from 
> > > > > which events can be pulled when the main channel isn't in use. It 
> > > > > would add headers like "lastAttempt" and "numberOfAttempts" to 
> > > > > events. Then it can be configurable for a "min time between attempts" 
> > > > > and "maximum attempts." When an event fails the second time, those 
> > > > > headers are updated and it goes back into the fail-channel. If it 
> > > > > comes out of the fail-channel but the lastAttempt is too recent, it 
> > > > > goes back in. If it fails more times than the maximum, it is written 
> > > > > to a final location (perhaps its just sent to another sink; maybe 
> > > > > this would have to be in a sink processor). Assuming all of those 
> > > > > steps are error-free, then all messages are preserved, and the 
> > > > > badly-formatted eventually get stored somewhere else. (This system 
> > > > > could be hacked together with current code - fail over sink processor 
> > > > > -> avro sink -> avro source on same instance, but t
hat's a little too hacky). 
> > > > > 
> > > > > Just some thoughts.
> > > > > 
> > > > > - Connor
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > On Thu, Aug 1, 2013 at 3:25 PM, Arvind Prabhakar <[email protected] 
> > > > > (mailto:[email protected])> wrote:
> > > > > > This sounds like a critical problem that can cause pipelines to 
> > > > > > block permanently. If you find yourself in this situation, a 
> > > > > > possible work around would be to decommission the channel, remove 
> > > > > > its data and route the flow with a new empty channel. If you have 
> > > > > > the ability to identify which component is causing the problem and 
> > > > > > see if you can remove it temporarily to let the problem events pass 
> > > > > > through another peer component.
> > > > > > 
> > > > > > I have also created FLUME-2140 [1] which will eventually allow the 
> > > > > > pipelines to identify and divert such bad events. If you have any 
> > > > > > logs, data, configurations that can be shared and will help provide 
> > > > > > more details for this problem, it will be great if you could attach 
> > > > > > them to this jira and provide your comments. 
> > > > > > 
> > > > > > [1] https://issues.apache.org/jira/browse/FLUME-2140 
> > > > > > 
> > > > > > Regards,
> > > > > > Arvind Prabhakar
> > > > > > 
> > > > > > On Thu, Aug 1, 2013 at 10:33 AM, Paul Chavez 
> > > > > > <[email protected] 
> > > > > > (mailto:[email protected])> wrote:
> > > > > > > There's no way to deal with a bad event once it's in the channel, 
> > > > > > > but you can mitigate future issues by having a timestamp 
> > > > > > > interceptor bound to the source feeding the channel. There is a 
> > > > > > > parameter 'preserve existing' that will only add the header if it 
> > > > > > > doesn't exist. If you don't want to have 'bad' time data in there 
> > > > > > > you could try a static interceptor with a specific past date so 
> > > > > > > that corrupt events fall into a deterministic path in HDFS. 
> > > > > > >  
> > > > > > > I use this technique to prevent stuck events for both timestamp 
> > > > > > > headers as well as some of our own custom headers we use for 
> > > > > > > tokenized paths. The static interceptor will insert an arbitrary 
> > > > > > > header if it doesn't exist so I have a couple that put in the 
> > > > > > > value 'Unknown' so that I can still send the events through the 
> > > > > > > HDFS sink but I can also find them later if need be.
> > > > > > >  
> > > > > > > hope that helps,
> > > > > > > Paul Chavez
> > > > > > > 
> > > > > > > From: Roshan Naik [mailto:[email protected]] 
> > > > > > > Sent: Thursday, August 01, 2013 10:27 AM
> > > > > > > To: [email protected] (mailto:[email protected])
> > > > > > > Subject: Re: Problem Events
> > > > > > > 
> > > > > > > some questions: 
> > > > > > > - why is the sink unable to consume the event ?
> > > > > > > - how would you like to identify such an event ? by examining its 
> > > > > > > content ? or by the fact that its ping-pong-ing between channel 
> > > > > > > and sink ?
> > > > > > > - what would you prefer to do with such an event ? merely drop it 
> > > > > > > ? 
> > > > > > > 
> > > > > > > 
> > > > > > > 
> > > > > > > On Thu, Aug 1, 2013 at 9:26 AM, Jeremy Karlson 
> > > > > > > <[email protected] (mailto:[email protected])> wrote:
> > > > > > > > To my knowledge (which is admittedly limited), there is no way 
> > > > > > > > to deal with these in a way that will make your day.  I'm happy 
> > > > > > > > if someone can say otherwise. 
> > > > > > > > 
> > > > > > > > This is very similar to a problem I had a week or two ago.  I 
> > > > > > > > fixed it by restarting Flume with debugging on, connecting to 
> > > > > > > > it with the debugger, and finding the message in the sink.  
> > > > > > > > Discover a bug in the sink.  Downloaded Flume, fixed bug, 
> > > > > > > > recompiled, installed custom version, etc. 
> > > > > > > > 
> > > > > > > > I agree that this is not a practical solution, and I still 
> > > > > > > > believe that Flume needs some sort of "sink of last resort" 
> > > > > > > > option or something, like JMS implementations. 
> > > > > > > > 
> > > > > > > > -- Jeremy 
> > > > > > > > 
> > > > > > > > 
> > > > > > > > 
> > > > > > > > On Thu, Aug 1, 2013 at 2:42 AM, Anat Rozenzon <[email protected] 
> > > > > > > > (mailto:[email protected])> wrote:
> > > > > > > > > The message is already in the channel.
> > > > > > > > > Is there a way to write an interceptor to work after the 
> > > > > > > > > channel? or before the sink?
> > > > > > > > > 
> > > > > > > > > The only thing I found is to stop everything and delete the 
> > > > > > > > > channel files, but I won't be able to use this approach in 
> > > > > > > > > production :-(
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > On Thu, Aug 1, 2013 at 11:13 AM, Ashish 
> > > > > > > > > <[email protected] (mailto:[email protected])> 
> > > > > > > > > wrote:
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > On Thu, Aug 1, 2013 at 1:29 PM, Anat Rozenzon 
> > > > > > > > > > <[email protected] (mailto:[email protected])> wrote:
> > > > > > > > > > > Hi,
> > > > > > > > > > > 
> > > > > > > > > > > I'm having the same problem with HDFS sink.
> > > > > > > > > > > 
> > > > > > > > > > > A 'poison' message which doesn't have timestamp header in 
> > > > > > > > > > > it as the sink expects.
> > > > > > > > > > > This causes a NPE which ends in returning the message to 
> > > > > > > > > > > the channel , over and over again.
> > > > > > > > > > > 
> > > > > > > > > > > Is my only option to re-write the HDFS sink?
> > > > > > > > > > > Isn't there any way to intercept in the sink work?
> > > > > > > > > > 
> > > > > > > > > > You can write a custom interceptor and remove/modify the 
> > > > > > > > > > poison message. 
> > > > > > > > > > 
> > > > > > > > > > Interceptors are called before message makes it way into 
> > > > > > > > > > the channel. 
> > > > > > > > > > 
> > > > > > > > > > http://flume.apache.org/FlumeUserGuide.html#flume-interceptors
> > > > > > > > > > 
> > > > > > > > > > I wrote a blog about it a while back 
> > > > > > > > > > http://www.ashishpaliwal.com/blog/2013/06/flume-cookbook-implementing-custom-interceptors/
> > > > > > > > > >  
> > > > > > > > > > 
> > > > > > > > > >  
> > > > > > > > > > > 
> > > > > > > > > > > Thanks
> > > > > > > > > > > Anat
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > On Fri, Jul 26, 2013 at 3:35 AM, Arvind Prabhakar 
> > > > > > > > > > > <[email protected] (mailto:[email protected])> wrote:
> > > > > > > > > > > > Sounds like a bug in ElasticSearch sink to me. Do you 
> > > > > > > > > > > > mind filing a Jira to track this? Sample data to cause 
> > > > > > > > > > > > this would be even better. 
> > > > > > > > > > > > 
> > > > > > > > > > > > Regards,
> > > > > > > > > > > > Arvind Prabhakar 
> > > > > > > > > > > > 
> > > > > > > > > > > > 
> > > > > > > > > > > > On Thu, Jul 25, 2013 at 9:50 AM, Jeremy Karlson 
> > > > > > > > > > > > <[email protected] 
> > > > > > > > > > > > (mailto:[email protected])> wrote:
> > > > > > > > > > > > > This was using the provided ElasticSearch sink.  The 
> > > > > > > > > > > > > logs were not helpful.  I ran it through with the 
> > > > > > > > > > > > > debugger and found the source of the problem. 
> > > > > > > > > > > > > 
> > > > > > > > > > > > > ContentBuilderUtil uses a very "aggressive" method to 
> > > > > > > > > > > > > determine if the content is JSON; if it contains a 
> > > > > > > > > > > > > "{" anywhere in it, it's considered JSON.  My body 
> > > > > > > > > > > > > contained that but wasn't JSON, causing the JSON 
> > > > > > > > > > > > > parser to throw a CharConversionException from 
> > > > > > > > > > > > > addComplexField(...) (but not the expected 
> > > > > > > > > > > > > JSONException).  We've changed addComplexField(...) 
> > > > > > > > > > > > > to catch different types of exceptions and fall back 
> > > > > > > > > > > > > to treating it as a simple field.  We'll probably 
> > > > > > > > > > > > > submit a patch for this soon. 
> > > > > > > > > > > > > 
> > > > > > > > > > > > > I'm reasonably happy with this, but I still think 
> > > > > > > > > > > > > that in the bigger picture there should be some sort 
> > > > > > > > > > > > > of mechanism to automatically detect and toss / skip 
> > > > > > > > > > > > > / flag problematic events without them plugging up 
> > > > > > > > > > > > > the flow. 
> > > > > > > > > > > > > 
> > > > > > > > > > > > > -- Jeremy 
> > > > > > > > > > > > > 
> > > > > > > > > > > > > 
> > > > > > > > > > > > > On Wed, Jul 24, 2013 at 7:51 PM, Arvind Prabhakar 
> > > > > > > > > > > > > <[email protected] (mailto:[email protected])> wrote:
> > > > > > > > > > > > > > Jeremy, would it be possible for you to show us 
> > > > > > > > > > > > > > logs for the part where the sink fails to remove an 
> > > > > > > > > > > > > > event from the channel? I am assuming this is a 
> > > > > > > > > > > > > > standard sink that Flume provides and not a custom 
> > > > > > > > > > > > > > one. 
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > The reason I ask is because sinks do not introspect 
> > > > > > > > > > > > > > the event, and hence there is no reason why it will 
> > > > > > > > > > > > > > fail during the event's removal. It is more likely 
> > > > > > > > > > > > > > that there is a problem within the channel in that 
> > > > > > > > > > > > > > it cannot dereference the event correctly. Looking 
> > > > > > > > > > > > > > at the logs will help us identify the root cause 
> > > > > > > > > > > > > > for what you are experiencing. 
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Regards, 
> > > > > > > > > > > > > > Arvind Prabhakar 
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > On Wed, Jul 24, 2013 at 3:56 PM, Jeremy Karlson 
> > > > > > > > > > > > > > <[email protected] 
> > > > > > > > > > > > > > (mailto:[email protected])> wrote:
> > > > > > > > > > > > > > > Both reasonable suggestions.  What would a custom 
> > > > > > > > > > > > > > > sink look like in this case, and how would I only 
> > > > > > > > > > > > > > > eliminate the problem events since I don't know 
> > > > > > > > > > > > > > > what they are until they are attempted by the 
> > > > > > > > > > > > > > > "real" sink? 
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > My philosophical concern (in general) is that 
> > > > > > > > > > > > > > > we're taking the approach of exhaustively finding 
> > > > > > > > > > > > > > > and eliminating possible failure cases.  It's not 
> > > > > > > > > > > > > > > possible to eliminate every single failure case, 
> > > > > > > > > > > > > > > so shouldn't there be a method of last resort to 
> > > > > > > > > > > > > > > eliminate problem events from the channel? 
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > -- Jeremy 
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > On Wed, Jul 24, 2013 at 3:45 PM, Hari Shreedharan 
> > > > > > > > > > > > > > > <[email protected] 
> > > > > > > > > > > > > > > (mailto:[email protected])> wrote:
> > > > > > > > > > > > > > > > Or you could write a custom sink that removes 
> > > > > > > > > > > > > > > > this event (more work of course) 
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > Thanks, 
> > > > > > > > > > > > > > > > Hari
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > On Wednesday, July 24, 2013 at 3:36 PM, Roshan 
> > > > > > > > > > > > > > > > Naik wrote:
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > if you have a way to identify such events.. 
> > > > > > > > > > > > > > > > > you may be able to use the Regex interceptor 
> > > > > > > > > > > > > > > > > to toss them out before they get into the 
> > > > > > > > > > > > > > > > > channel.
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > On Wed, Jul 24, 2013 at 2:52 PM, Jeremy 
> > > > > > > > > > > > > > > > > Karlson <[email protected] 
> > > > > > > > > > > > > > > > > (mailto:[email protected])> wrote:
> > > > > > > > > > > > > > > > > > Hi everyone.  My Flume adventures continue. 
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > I'm in a situation now where I have a 
> > > > > > > > > > > > > > > > > > channel that's filling because a stubborn 
> > > > > > > > > > > > > > > > > > message is stuck.  The sink won't accept it 
> > > > > > > > > > > > > > > > > > (for whatever reason; I can go into detail 
> > > > > > > > > > > > > > > > > > but that's not my point here).  This just 
> > > > > > > > > > > > > > > > > > blocks up the channel entirely, because it 
> > > > > > > > > > > > > > > > > > goes back into the channel when the sink 
> > > > > > > > > > > > > > > > > > refuses.  Obviously, this isn't ideal. 
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > I'm wondering what mechanisms, if any, 
> > > > > > > > > > > > > > > > > > Flume has to deal with these situations.  
> > > > > > > > > > > > > > > > > > Things that come to mind might be: 
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > 1. Ditch the event after n attempts. 
> > > > > > > > > > > > > > > > > > 2. After n attempts, send the event to a 
> > > > > > > > > > > > > > > > > > "problem area" (maybe a different source / 
> > > > > > > > > > > > > > > > > > sink / channel?)  that someone can look at 
> > > > > > > > > > > > > > > > > > later.
> > > > > > > > > > > > > > > > > > 3. Some sort of mechanism that allows 
> > > > > > > > > > > > > > > > > > operators to manually kill these messages.
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > I'm open to suggestions on alternatives as 
> > > > > > > > > > > > > > > > > > well. 
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > Thanks. 
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > -- Jeremy
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > 
> > > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > -- 
> > > > > > > > > > thanks
> > > > > > > > > > ashish
> > > > > > > > > > 
> > > > > > > > > > Blog: http://www.ashishpaliwal.com/blog
> > > > > > > > > > My Photo Galleries: http://www.pbase.com/ashishpaliwal 
> > > > > > > > 
> > > > > > > 
> > > > > > 
> > > > > 
> > > > 
> > > 
> > 
> 

Reply via email to