That's what I'll do, add a static timestamp of 1 and let all the 'bad' messages flow into one directoty. Thanks
On Wed, Aug 7, 2013 at 5:14 PM, Jonathan Cooper-Ellis <[email protected]>wrote: > You can use a Static Interceptor before the RegexExtractor to add a > timestamp of zero to the header, which can then be overwritten by the > proper timestamp (if it exists). It also should sink misses into an obvious > 'miss' directory. > > > On Tue, Aug 6, 2013 at 10:40 PM, Anat Rozenzon <[email protected]> wrote: > >> After some reading in the docs I think the existing fail-over behavior >> can't be used to solve the 'poison' message problem as it put the 'failed' >> sink in a 'cooldown' period. >> As the problem is in the message and not the sink, it means that after a >> poison message had arrived, the HDFS sink will 'fail' and thus next X >> messages will go to the failover sink. >> My only solution for now is to avoid my current problem and hope that I >> won't have any other problematic messages, I'll be glad to have a less >> fragile solution. >> >> Many thanks! >> Other than that, Flume looks like a great tool :-) >> >> Anat >> >> >> On Sun, Aug 4, 2013 at 8:45 AM, Anat Rozenzon <[email protected]> wrote: >> >>> I think using a fail-over processor is a very good idea, I think I'll >>> use it as an immediate solution. >>> For the long run, I would like to see a general solution (not specific >>> to file channel, in my case it is an HDFS channel), so the suggestion to >>> add 'Poison Message' sink to the sink processor sound good. >>> >>> Just FYI, my problem is that a log file going through my source did not >>> have (in all rows) the structure I expected. >>> >>> Since I used regexp extractor to put timestamp, the 'bad' row didn't >>> match the regexp and the timestamp was not set, then the HDFS sink throws >>> NPE on that: >>> 01 Aug 2013 09:36:24,259 ERROR >>> [SinkRunner-PollingRunner-DefaultSinkProcessor] >>> (org.apache.flume.sink.hdfs.HDFSEventSink.process:422) - process failed >>> java.lang.NullPointerException: Expected timestamp in the Flume event >>> headers, but it was null >>> at >>> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204) >>> at >>> org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:200) >>> at >>> org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:396) >>> at >>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356) >>> at >>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) >>> at >>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) >>> at java.lang.Thread.run(Thread.java:722) >>> 01 Aug 2013 09:36:24,262 ERROR >>> [SinkRunner-PollingRunner-DefaultSinkProcessor] >>> (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver >>> event. Exception follows. >>> org.apache.flume.EventDeliveryException: java.lang.NullPointerException: >>> Expected timestamp in the Flume event headers, but it was null >>> at >>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:426) >>> at >>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) >>> at >>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) >>> at java.lang.Thread.run(Thread.java:722) >>> Caused by: java.lang.NullPointerException: Expected timestamp in the >>> Flume event headers, but it was null >>> at >>> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204) >>> at >>> org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:200) >>> at >>> org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:396) >>> at >>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356) >>> ... 3 more >>> >>> >>> I fixed my regexp now, still, I can never be sure all the log files the >>> system creates will be perfect without any bad lines. >>> >>> >>> On Sat, Aug 3, 2013 at 9:56 AM, Connor Woodson >>> <[email protected]>wrote: >>> >>>> Just some more thoughts. It could be even easier: >>>> >>>> The whole set up might be even easier; for the failover sink processor, >>>> you have those settings "max attempts" and "time between attempts" and it >>>> will just try one event X times before it gives up and sends it to the next >>>> sink. The time between events could even backoff if needed. >>>> >>>> The advantage of this is that it preserves the ordering of events, >>>> something which gets completely broken in the previous scenario. >>>> >>>> - Connor >>>> >>>> >>>> On Fri, Aug 2, 2013 at 6:27 PM, Connor Woodson >>>> <[email protected]>wrote: >>>> >>>>> As another option to solve the problem of having a bad event in a >>>>> channel: using a fail-over sink processor, log all bad events to a local >>>>> file. And to be extra cautious, add a third failover of a null sink. This >>>>> will mean that events will always flow through your channel. The file sink >>>>> should almost never fail, so you shouldn't be losing events in the >>>>> process. >>>>> And then you can re-process everything in the file if you still want those >>>>> events for something. >>>>> >>>>> For the system of having Flume detect bad events, I think implementing >>>>> something like above is better than discarding events that fail X times. >>>>> For instance, if you have an Avro sink -> Avro source, and you're >>>>> restarting your source, Flume would end up discarding events >>>>> unnecessarily. >>>>> Instead, how about implementing the above system and then go a step >>>>> further: Flume will attempt to re-send the bad events itself. And then if >>>>> a >>>>> bad event isn't able to be sent after X attempts, it is can be discarded. >>>>> >>>>> I envision this system as an extension to the current File Channel; >>>>> when an event fails, it is written to a secondary File Channel from which >>>>> events can be pulled when the main channel isn't in use. It would add >>>>> headers like "lastAttempt" and "numberOfAttempts" to events. Then it can >>>>> be >>>>> configurable for a "min time between attempts" and "maximum attempts." >>>>> When >>>>> an event fails the second time, those headers are updated and it goes back >>>>> into the fail-channel. If it comes out of the fail-channel but the >>>>> lastAttempt is too recent, it goes back in. If it fails more times than >>>>> the >>>>> maximum, it is written to a final location (perhaps its just sent to >>>>> another sink; maybe this would have to be in a sink processor). Assuming >>>>> all of those steps are error-free, then all messages are preserved, and >>>>> the >>>>> badly-formatted eventually get stored somewhere else. (This system could >>>>> be >>>>> hacked together with current code - fail over sink processor -> avro sink >>>>> -> avro source on same instance, but that's a little too hacky). >>>>> >>>>> Just some thoughts. >>>>> >>>>> - Connor >>>>> >>>>> >>>>> >>>>> >>>>> On Thu, Aug 1, 2013 at 3:25 PM, Arvind Prabhakar <[email protected]>wrote: >>>>> >>>>>> This sounds like a critical problem that can cause pipelines to block >>>>>> permanently. If you find yourself in this situation, a possible work >>>>>> around >>>>>> would be to decommission the channel, remove its data and route the flow >>>>>> with a new empty channel. If you have the ability to identify which >>>>>> component is causing the problem and see if you can remove it temporarily >>>>>> to let the problem events pass through another peer component. >>>>>> >>>>>> I have also created FLUME-2140 [1] which will eventually allow the >>>>>> pipelines to identify and divert such bad events. If you have any logs, >>>>>> data, configurations that can be shared and will help provide more >>>>>> details >>>>>> for this problem, it will be great if you could attach them to this jira >>>>>> and provide your comments. >>>>>> >>>>>> [1] https://issues.apache.org/jira/browse/FLUME-2140 >>>>>> >>>>>> Regards, >>>>>> Arvind Prabhakar >>>>>> >>>>>> On Thu, Aug 1, 2013 at 10:33 AM, Paul Chavez < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> ** >>>>>>> There's no way to deal with a bad event once it's in the channel, >>>>>>> but you can mitigate future issues by having a timestamp interceptor >>>>>>> bound >>>>>>> to the source feeding the channel. There is a parameter 'preserve >>>>>>> existing' >>>>>>> that will only add the header if it doesn't exist. If you don't want to >>>>>>> have 'bad' time data in there you could try a static interceptor with a >>>>>>> specific past date so that corrupt events fall into a deterministic >>>>>>> path in >>>>>>> HDFS. >>>>>>> >>>>>>> I use this technique to prevent stuck events for both timestamp >>>>>>> headers as well as some of our own custom headers we use for tokenized >>>>>>> paths. The static interceptor will insert an arbitrary header if it >>>>>>> doesn't >>>>>>> exist so I have a couple that put in the value 'Unknown' so that I can >>>>>>> still send the events through the HDFS sink but I can also find them >>>>>>> later >>>>>>> if need be. >>>>>>> >>>>>>> hope that helps, >>>>>>> Paul Chavez >>>>>>> >>>>>>> ------------------------------ >>>>>>> *From:* Roshan Naik [mailto:[email protected]] >>>>>>> *Sent:* Thursday, August 01, 2013 10:27 AM >>>>>>> *To:* [email protected] >>>>>>> *Subject:* Re: Problem Events >>>>>>> >>>>>>> some questions: >>>>>>> - why is the sink unable to consume the event ? >>>>>>> - how would you like to identify such an event ? by examining its >>>>>>> content ? or by the fact that its ping-pong-ing between channel and >>>>>>> sink ? >>>>>>> - what would you prefer to do with such an event ? merely drop it ? >>>>>>> >>>>>>> >>>>>>> On Thu, Aug 1, 2013 at 9:26 AM, Jeremy Karlson < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> To my knowledge (which is admittedly limited), there is no way to >>>>>>>> deal with these in a way that will make your day. I'm happy if >>>>>>>> someone can >>>>>>>> say otherwise. >>>>>>>> >>>>>>>> This is very similar to a problem I had a week or two ago. I fixed >>>>>>>> it by restarting Flume with debugging on, connecting to it with the >>>>>>>> debugger, and finding the message in the sink. Discover a bug in the >>>>>>>> sink. >>>>>>>> Downloaded Flume, fixed bug, recompiled, installed custom version, >>>>>>>> etc. >>>>>>>> >>>>>>>> I agree that this is not a practical solution, and I still believe >>>>>>>> that Flume needs some sort of "sink of last resort" option or >>>>>>>> something, >>>>>>>> like JMS implementations. >>>>>>>> >>>>>>>> -- Jeremy >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Aug 1, 2013 at 2:42 AM, Anat Rozenzon <[email protected]>wrote: >>>>>>>> >>>>>>>>> The message is already in the channel. >>>>>>>>> Is there a way to write an interceptor to work after the channel? >>>>>>>>> or before the sink? >>>>>>>>> >>>>>>>>> The only thing I found is to stop everything and delete the >>>>>>>>> channel files, but I won't be able to use this approach in production >>>>>>>>> :-( >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Aug 1, 2013 at 11:13 AM, Ashish >>>>>>>>> <[email protected]>wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Aug 1, 2013 at 1:29 PM, Anat Rozenzon <[email protected]>wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> I'm having the same problem with HDFS sink. >>>>>>>>>>> >>>>>>>>>>> A 'poison' message which doesn't have timestamp header in it as >>>>>>>>>>> the sink expects. >>>>>>>>>>> This causes a NPE which ends in returning the message to the >>>>>>>>>>> channel , over and over again. >>>>>>>>>>> >>>>>>>>>>> Is my only option to re-write the HDFS sink? >>>>>>>>>>> Isn't there any way to intercept in the sink work? >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> You can write a custom interceptor and remove/modify the poison >>>>>>>>>> message. >>>>>>>>>> >>>>>>>>>> Interceptors are called before message makes it way into the >>>>>>>>>> channel. >>>>>>>>>> >>>>>>>>>> http://flume.apache.org/FlumeUserGuide.html#flume-interceptors >>>>>>>>>> >>>>>>>>>> I wrote a blog about it a while back >>>>>>>>>> http://www.ashishpaliwal.com/blog/2013/06/flume-cookbook-implementing-custom-interceptors/ >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> Anat >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, Jul 26, 2013 at 3:35 AM, Arvind Prabhakar < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Sounds like a bug in ElasticSearch sink to me. Do you mind >>>>>>>>>>>> filing a Jira to track this? Sample data to cause this would be >>>>>>>>>>>> even >>>>>>>>>>>> better. >>>>>>>>>>>> >>>>>>>>>>>> Regards, >>>>>>>>>>>> Arvind Prabhakar >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Jul 25, 2013 at 9:50 AM, Jeremy Karlson < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> This was using the provided ElasticSearch sink. The logs were >>>>>>>>>>>>> not helpful. I ran it through with the debugger and found the >>>>>>>>>>>>> source of >>>>>>>>>>>>> the problem. >>>>>>>>>>>>> >>>>>>>>>>>>> ContentBuilderUtil uses a very "aggressive" method to >>>>>>>>>>>>> determine if the content is JSON; if it contains a "{" anywhere >>>>>>>>>>>>> in it, it's >>>>>>>>>>>>> considered JSON. My body contained that but wasn't JSON, causing >>>>>>>>>>>>> the JSON >>>>>>>>>>>>> parser to throw a CharConversionException from >>>>>>>>>>>>> addComplexField(...) (but >>>>>>>>>>>>> not the expected JSONException). We've changed >>>>>>>>>>>>> addComplexField(...) to >>>>>>>>>>>>> catch different types of exceptions and fall back to treating it >>>>>>>>>>>>> as a >>>>>>>>>>>>> simple field. We'll probably submit a patch for this soon. >>>>>>>>>>>>> >>>>>>>>>>>>> I'm reasonably happy with this, but I still think that in the >>>>>>>>>>>>> bigger picture there should be some sort of mechanism to >>>>>>>>>>>>> automatically >>>>>>>>>>>>> detect and toss / skip / flag problematic events without them >>>>>>>>>>>>> plugging up >>>>>>>>>>>>> the flow. >>>>>>>>>>>>> >>>>>>>>>>>>> -- Jeremy >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Jul 24, 2013 at 7:51 PM, Arvind Prabhakar < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Jeremy, would it be possible for you to show us logs for the >>>>>>>>>>>>>> part where the sink fails to remove an event from the channel? I >>>>>>>>>>>>>> am >>>>>>>>>>>>>> assuming this is a standard sink that Flume provides and not a >>>>>>>>>>>>>> custom one. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The reason I ask is because sinks do not introspect the >>>>>>>>>>>>>> event, and hence there is no reason why it will fail during the >>>>>>>>>>>>>> event's >>>>>>>>>>>>>> removal. It is more likely that there is a problem within the >>>>>>>>>>>>>> channel in >>>>>>>>>>>>>> that it cannot dereference the event correctly. Looking at the >>>>>>>>>>>>>> logs will >>>>>>>>>>>>>> help us identify the root cause for what you are experiencing. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>> Arvind Prabhakar >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Jul 24, 2013 at 3:56 PM, Jeremy Karlson < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Both reasonable suggestions. What would a custom sink >>>>>>>>>>>>>>> look like in this case, and how would I only eliminate the >>>>>>>>>>>>>>> problem events >>>>>>>>>>>>>>> since I don't know what they are until they are attempted by >>>>>>>>>>>>>>> the "real" >>>>>>>>>>>>>>> sink? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> My philosophical concern (in general) is that we're taking >>>>>>>>>>>>>>> the approach of exhaustively finding and eliminating possible >>>>>>>>>>>>>>> failure >>>>>>>>>>>>>>> cases. It's not possible to eliminate every single failure >>>>>>>>>>>>>>> case, so >>>>>>>>>>>>>>> shouldn't there be a method of last resort to eliminate problem >>>>>>>>>>>>>>> events from >>>>>>>>>>>>>>> the channel? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- Jeremy >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, Jul 24, 2013 at 3:45 PM, Hari Shreedharan < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Or you could write a custom sink that removes this event >>>>>>>>>>>>>>>> (more work of course) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>> Hari >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Wednesday, July 24, 2013 at 3:36 PM, Roshan Naik wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> if you have a way to identify such events.. you may be >>>>>>>>>>>>>>>> able to use the Regex interceptor to toss them out before they >>>>>>>>>>>>>>>> get into the >>>>>>>>>>>>>>>> channel. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Wed, Jul 24, 2013 at 2:52 PM, Jeremy Karlson < >>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi everyone. My Flume adventures continue. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I'm in a situation now where I have a channel that's >>>>>>>>>>>>>>>> filling because a stubborn message is stuck. The sink won't >>>>>>>>>>>>>>>> accept it (for >>>>>>>>>>>>>>>> whatever reason; I can go into detail but that's not my point >>>>>>>>>>>>>>>> here). This >>>>>>>>>>>>>>>> just blocks up the channel entirely, because it goes back into >>>>>>>>>>>>>>>> the channel >>>>>>>>>>>>>>>> when the sink refuses. Obviously, this isn't ideal. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I'm wondering what mechanisms, if any, Flume has to deal >>>>>>>>>>>>>>>> with these situations. Things that come to mind might be: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 1. Ditch the event after n attempts. >>>>>>>>>>>>>>>> 2. After n attempts, send the event to a "problem area" >>>>>>>>>>>>>>>> (maybe a different source / sink / channel?) that someone can >>>>>>>>>>>>>>>> look at >>>>>>>>>>>>>>>> later. >>>>>>>>>>>>>>>> 3. Some sort of mechanism that allows operators to manually >>>>>>>>>>>>>>>> kill these messages. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I'm open to suggestions on alternatives as well. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -- Jeremy >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> thanks >>>>>>>>>> ashish >>>>>>>>>> >>>>>>>>>> Blog: http://www.ashishpaliwal.com/blog >>>>>>>>>> My Photo Galleries: http://www.pbase.com/ashishpaliwal >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
