After some reading in the docs I think the existing fail-over behavior can't be used to solve the 'poison' message problem as it put the 'failed' sink in a 'cooldown' period. As the problem is in the message and not the sink, it means that after a poison message had arrived, the HDFS sink will 'fail' and thus next X messages will go to the failover sink. My only solution for now is to avoid my current problem and hope that I won't have any other problematic messages, I'll be glad to have a less fragile solution.
Many thanks! Other than that, Flume looks like a great tool :-) Anat On Sun, Aug 4, 2013 at 8:45 AM, Anat Rozenzon <[email protected]> wrote: > I think using a fail-over processor is a very good idea, I think I'll use > it as an immediate solution. > For the long run, I would like to see a general solution (not specific to > file channel, in my case it is an HDFS channel), so the suggestion to add > 'Poison Message' sink to the sink processor sound good. > > Just FYI, my problem is that a log file going through my source did not > have (in all rows) the structure I expected. > > Since I used regexp extractor to put timestamp, the 'bad' row didn't match > the regexp and the timestamp was not set, then the HDFS sink throws NPE on > that: > 01 Aug 2013 09:36:24,259 ERROR > [SinkRunner-PollingRunner-DefaultSinkProcessor] > (org.apache.flume.sink.hdfs.HDFSEventSink.process:422) - process failed > java.lang.NullPointerException: Expected timestamp in the Flume event > headers, but it was null > at > com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204) > at > org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:200) > at > org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:396) > at > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356) > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) > at > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) > at java.lang.Thread.run(Thread.java:722) > 01 Aug 2013 09:36:24,262 ERROR > [SinkRunner-PollingRunner-DefaultSinkProcessor] > (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver > event. Exception follows. > org.apache.flume.EventDeliveryException: java.lang.NullPointerException: > Expected timestamp in the Flume event headers, but it was null > at > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:426) > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) > at > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) > at java.lang.Thread.run(Thread.java:722) > Caused by: java.lang.NullPointerException: Expected timestamp in the Flume > event headers, but it was null > at > com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204) > at > org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:200) > at > org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:396) > at > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356) > ... 3 more > > > I fixed my regexp now, still, I can never be sure all the log files the > system creates will be perfect without any bad lines. > > > On Sat, Aug 3, 2013 at 9:56 AM, Connor Woodson <[email protected]>wrote: > >> Just some more thoughts. It could be even easier: >> >> The whole set up might be even easier; for the failover sink processor, >> you have those settings "max attempts" and "time between attempts" and it >> will just try one event X times before it gives up and sends it to the next >> sink. The time between events could even backoff if needed. >> >> The advantage of this is that it preserves the ordering of events, >> something which gets completely broken in the previous scenario. >> >> - Connor >> >> >> On Fri, Aug 2, 2013 at 6:27 PM, Connor Woodson <[email protected]>wrote: >> >>> As another option to solve the problem of having a bad event in a >>> channel: using a fail-over sink processor, log all bad events to a local >>> file. And to be extra cautious, add a third failover of a null sink. This >>> will mean that events will always flow through your channel. The file sink >>> should almost never fail, so you shouldn't be losing events in the process. >>> And then you can re-process everything in the file if you still want those >>> events for something. >>> >>> For the system of having Flume detect bad events, I think implementing >>> something like above is better than discarding events that fail X times. >>> For instance, if you have an Avro sink -> Avro source, and you're >>> restarting your source, Flume would end up discarding events unnecessarily. >>> Instead, how about implementing the above system and then go a step >>> further: Flume will attempt to re-send the bad events itself. And then if a >>> bad event isn't able to be sent after X attempts, it is can be discarded. >>> >>> I envision this system as an extension to the current File Channel; when >>> an event fails, it is written to a secondary File Channel from which events >>> can be pulled when the main channel isn't in use. It would add headers like >>> "lastAttempt" and "numberOfAttempts" to events. Then it can be configurable >>> for a "min time between attempts" and "maximum attempts." When an event >>> fails the second time, those headers are updated and it goes back into the >>> fail-channel. If it comes out of the fail-channel but the lastAttempt is >>> too recent, it goes back in. If it fails more times than the maximum, it is >>> written to a final location (perhaps its just sent to another sink; maybe >>> this would have to be in a sink processor). Assuming all of those steps are >>> error-free, then all messages are preserved, and the badly-formatted >>> eventually get stored somewhere else. (This system could be hacked together >>> with current code - fail over sink processor -> avro sink -> avro source on >>> same instance, but that's a little too hacky). >>> >>> Just some thoughts. >>> >>> - Connor >>> >>> >>> >>> >>> On Thu, Aug 1, 2013 at 3:25 PM, Arvind Prabhakar <[email protected]>wrote: >>> >>>> This sounds like a critical problem that can cause pipelines to block >>>> permanently. If you find yourself in this situation, a possible work around >>>> would be to decommission the channel, remove its data and route the flow >>>> with a new empty channel. If you have the ability to identify which >>>> component is causing the problem and see if you can remove it temporarily >>>> to let the problem events pass through another peer component. >>>> >>>> I have also created FLUME-2140 [1] which will eventually allow the >>>> pipelines to identify and divert such bad events. If you have any logs, >>>> data, configurations that can be shared and will help provide more details >>>> for this problem, it will be great if you could attach them to this jira >>>> and provide your comments. >>>> >>>> [1] https://issues.apache.org/jira/browse/FLUME-2140 >>>> >>>> Regards, >>>> Arvind Prabhakar >>>> >>>> On Thu, Aug 1, 2013 at 10:33 AM, Paul Chavez < >>>> [email protected]> wrote: >>>> >>>>> ** >>>>> There's no way to deal with a bad event once it's in the channel, but >>>>> you can mitigate future issues by having a timestamp interceptor bound to >>>>> the source feeding the channel. There is a parameter 'preserve existing' >>>>> that will only add the header if it doesn't exist. If you don't want to >>>>> have 'bad' time data in there you could try a static interceptor with a >>>>> specific past date so that corrupt events fall into a deterministic path >>>>> in >>>>> HDFS. >>>>> >>>>> I use this technique to prevent stuck events for both timestamp >>>>> headers as well as some of our own custom headers we use for tokenized >>>>> paths. The static interceptor will insert an arbitrary header if it >>>>> doesn't >>>>> exist so I have a couple that put in the value 'Unknown' so that I can >>>>> still send the events through the HDFS sink but I can also find them later >>>>> if need be. >>>>> >>>>> hope that helps, >>>>> Paul Chavez >>>>> >>>>> ------------------------------ >>>>> *From:* Roshan Naik [mailto:[email protected]] >>>>> *Sent:* Thursday, August 01, 2013 10:27 AM >>>>> *To:* [email protected] >>>>> *Subject:* Re: Problem Events >>>>> >>>>> some questions: >>>>> - why is the sink unable to consume the event ? >>>>> - how would you like to identify such an event ? by examining its >>>>> content ? or by the fact that its ping-pong-ing between channel and sink ? >>>>> - what would you prefer to do with such an event ? merely drop it ? >>>>> >>>>> >>>>> On Thu, Aug 1, 2013 at 9:26 AM, Jeremy Karlson < >>>>> [email protected]> wrote: >>>>> >>>>>> To my knowledge (which is admittedly limited), there is no way to >>>>>> deal with these in a way that will make your day. I'm happy if someone >>>>>> can >>>>>> say otherwise. >>>>>> >>>>>> This is very similar to a problem I had a week or two ago. I fixed >>>>>> it by restarting Flume with debugging on, connecting to it with the >>>>>> debugger, and finding the message in the sink. Discover a bug in the >>>>>> sink. >>>>>> Downloaded Flume, fixed bug, recompiled, installed custom version, etc. >>>>>> >>>>>> I agree that this is not a practical solution, and I still believe >>>>>> that Flume needs some sort of "sink of last resort" option or something, >>>>>> like JMS implementations. >>>>>> >>>>>> -- Jeremy >>>>>> >>>>>> >>>>>> >>>>>> On Thu, Aug 1, 2013 at 2:42 AM, Anat Rozenzon <[email protected]> wrote: >>>>>> >>>>>>> The message is already in the channel. >>>>>>> Is there a way to write an interceptor to work after the channel? or >>>>>>> before the sink? >>>>>>> >>>>>>> The only thing I found is to stop everything and delete the channel >>>>>>> files, but I won't be able to use this approach in production :-( >>>>>>> >>>>>>> >>>>>>> On Thu, Aug 1, 2013 at 11:13 AM, Ashish <[email protected]>wrote: >>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Aug 1, 2013 at 1:29 PM, Anat Rozenzon <[email protected]>wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> I'm having the same problem with HDFS sink. >>>>>>>>> >>>>>>>>> A 'poison' message which doesn't have timestamp header in it as >>>>>>>>> the sink expects. >>>>>>>>> This causes a NPE which ends in returning the message to the >>>>>>>>> channel , over and over again. >>>>>>>>> >>>>>>>>> Is my only option to re-write the HDFS sink? >>>>>>>>> Isn't there any way to intercept in the sink work? >>>>>>>>> >>>>>>>> >>>>>>>> You can write a custom interceptor and remove/modify the poison >>>>>>>> message. >>>>>>>> >>>>>>>> Interceptors are called before message makes it way into the >>>>>>>> channel. >>>>>>>> >>>>>>>> http://flume.apache.org/FlumeUserGuide.html#flume-interceptors >>>>>>>> >>>>>>>> I wrote a blog about it a while back >>>>>>>> http://www.ashishpaliwal.com/blog/2013/06/flume-cookbook-implementing-custom-interceptors/ >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Anat >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Jul 26, 2013 at 3:35 AM, Arvind Prabhakar < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Sounds like a bug in ElasticSearch sink to me. Do you mind filing >>>>>>>>>> a Jira to track this? Sample data to cause this would be even better. >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> Arvind Prabhakar >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Jul 25, 2013 at 9:50 AM, Jeremy Karlson < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> This was using the provided ElasticSearch sink. The logs were >>>>>>>>>>> not helpful. I ran it through with the debugger and found the >>>>>>>>>>> source of >>>>>>>>>>> the problem. >>>>>>>>>>> >>>>>>>>>>> ContentBuilderUtil uses a very "aggressive" method to determine >>>>>>>>>>> if the content is JSON; if it contains a "{" anywhere in it, it's >>>>>>>>>>> considered JSON. My body contained that but wasn't JSON, causing >>>>>>>>>>> the JSON >>>>>>>>>>> parser to throw a CharConversionException from addComplexField(...) >>>>>>>>>>> (but >>>>>>>>>>> not the expected JSONException). We've changed >>>>>>>>>>> addComplexField(...) to >>>>>>>>>>> catch different types of exceptions and fall back to treating it as >>>>>>>>>>> a >>>>>>>>>>> simple field. We'll probably submit a patch for this soon. >>>>>>>>>>> >>>>>>>>>>> I'm reasonably happy with this, but I still think that in the >>>>>>>>>>> bigger picture there should be some sort of mechanism to >>>>>>>>>>> automatically >>>>>>>>>>> detect and toss / skip / flag problematic events without them >>>>>>>>>>> plugging up >>>>>>>>>>> the flow. >>>>>>>>>>> >>>>>>>>>>> -- Jeremy >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Jul 24, 2013 at 7:51 PM, Arvind Prabhakar < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Jeremy, would it be possible for you to show us logs for the >>>>>>>>>>>> part where the sink fails to remove an event from the channel? I am >>>>>>>>>>>> assuming this is a standard sink that Flume provides and not a >>>>>>>>>>>> custom one. >>>>>>>>>>>> >>>>>>>>>>>> The reason I ask is because sinks do not introspect the event, >>>>>>>>>>>> and hence there is no reason why it will fail during the event's >>>>>>>>>>>> removal. >>>>>>>>>>>> It is more likely that there is a problem within the channel in >>>>>>>>>>>> that it >>>>>>>>>>>> cannot dereference the event correctly. Looking at the logs will >>>>>>>>>>>> help us >>>>>>>>>>>> identify the root cause for what you are experiencing. >>>>>>>>>>>> >>>>>>>>>>>> Regards, >>>>>>>>>>>> Arvind Prabhakar >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Jul 24, 2013 at 3:56 PM, Jeremy Karlson < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Both reasonable suggestions. What would a custom sink look >>>>>>>>>>>>> like in this case, and how would I only eliminate the problem >>>>>>>>>>>>> events since >>>>>>>>>>>>> I don't know what they are until they are attempted by the "real" >>>>>>>>>>>>> sink? >>>>>>>>>>>>> >>>>>>>>>>>>> My philosophical concern (in general) is that we're taking the >>>>>>>>>>>>> approach of exhaustively finding and eliminating possible failure >>>>>>>>>>>>> cases. >>>>>>>>>>>>> It's not possible to eliminate every single failure case, so >>>>>>>>>>>>> shouldn't >>>>>>>>>>>>> there be a method of last resort to eliminate problem events from >>>>>>>>>>>>> the >>>>>>>>>>>>> channel? >>>>>>>>>>>>> >>>>>>>>>>>>> -- Jeremy >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Jul 24, 2013 at 3:45 PM, Hari Shreedharan < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Or you could write a custom sink that removes this event >>>>>>>>>>>>>> (more work of course) >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> Hari >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wednesday, July 24, 2013 at 3:36 PM, Roshan Naik wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> if you have a way to identify such events.. you may be >>>>>>>>>>>>>> able to use the Regex interceptor to toss them out before they >>>>>>>>>>>>>> get into the >>>>>>>>>>>>>> channel. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Jul 24, 2013 at 2:52 PM, Jeremy Karlson < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hi everyone. My Flume adventures continue. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I'm in a situation now where I have a channel that's filling >>>>>>>>>>>>>> because a stubborn message is stuck. The sink won't accept it >>>>>>>>>>>>>> (for >>>>>>>>>>>>>> whatever reason; I can go into detail but that's not my point >>>>>>>>>>>>>> here). This >>>>>>>>>>>>>> just blocks up the channel entirely, because it goes back into >>>>>>>>>>>>>> the channel >>>>>>>>>>>>>> when the sink refuses. Obviously, this isn't ideal. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I'm wondering what mechanisms, if any, Flume has to deal with >>>>>>>>>>>>>> these situations. Things that come to mind might be: >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1. Ditch the event after n attempts. >>>>>>>>>>>>>> 2. After n attempts, send the event to a "problem area" >>>>>>>>>>>>>> (maybe a different source / sink / channel?) that someone can >>>>>>>>>>>>>> look at >>>>>>>>>>>>>> later. >>>>>>>>>>>>>> 3. Some sort of mechanism that allows operators to manually >>>>>>>>>>>>>> kill these messages. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I'm open to suggestions on alternatives as well. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- Jeremy >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> thanks >>>>>>>> ashish >>>>>>>> >>>>>>>> Blog: http://www.ashishpaliwal.com/blog >>>>>>>> My Photo Galleries: http://www.pbase.com/ashishpaliwal >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
