That's what I'll do, add a static timestamp of 1 and let all the 'bad'
messages flow into one directoty.
Thanks


On Wed, Aug 7, 2013 at 5:14 PM, Jonathan Cooper-Ellis <[email protected]>wrote:

> You can use a Static Interceptor before the RegexExtractor to add a
> timestamp of zero to the header, which can then be overwritten by the
> proper timestamp (if it exists). It also should sink misses into an obvious
> 'miss' directory.
>
>
> On Tue, Aug 6, 2013 at 10:40 PM, Anat Rozenzon <[email protected]> wrote:
>
>> After some reading in the docs I think the existing fail-over behavior
>> can't be used to solve the 'poison' message problem as it put the 'failed'
>> sink in  a 'cooldown' period.
>> As the problem is in the message and not the sink, it means that after a
>> poison message had arrived, the HDFS sink will 'fail' and thus next X
>> messages will go to the failover sink.
>> My only solution for now is to avoid my current problem and hope that I
>> won't have any other problematic messages, I'll be glad to have a less
>> fragile solution.
>>
>> Many thanks!
>> Other than that, Flume looks like a great tool :-)
>>
>> Anat
>>
>>
>> On Sun, Aug 4, 2013 at 8:45 AM, Anat Rozenzon <[email protected]> wrote:
>>
>>> I think using a fail-over processor is a very good idea, I think I'll
>>> use it as an immediate solution.
>>> For the long run, I would like to see a general solution (not specific
>>> to file channel, in my case it is an HDFS channel), so the suggestion to
>>> add 'Poison Message' sink to the sink processor sound good.
>>>
>>> Just FYI, my problem is that a log file going through my source did not
>>> have (in all rows) the structure I expected.
>>>
>>> Since I used regexp extractor to put timestamp, the 'bad' row didn't
>>> match the regexp and the timestamp was not set, then the HDFS sink throws
>>> NPE on that:
>>> 01 Aug 2013 09:36:24,259 ERROR
>>> [SinkRunner-PollingRunner-DefaultSinkProcessor]
>>> (org.apache.flume.sink.hdfs.HDFSEventSink.process:422)  - process failed
>>> java.lang.NullPointerException: Expected timestamp in the Flume event
>>> headers, but it was null
>>>         at
>>> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
>>>         at
>>> org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:200)
>>>         at
>>> org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:396)
>>>         at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356)
>>>         at
>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>         at
>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>         at java.lang.Thread.run(Thread.java:722)
>>> 01 Aug 2013 09:36:24,262 ERROR
>>> [SinkRunner-PollingRunner-DefaultSinkProcessor]
>>> (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
>>> event. Exception follows.
>>> org.apache.flume.EventDeliveryException: java.lang.NullPointerException:
>>> Expected timestamp in the Flume event headers, but it was null
>>>         at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:426)
>>>         at
>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>         at
>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>         at java.lang.Thread.run(Thread.java:722)
>>> Caused by: java.lang.NullPointerException: Expected timestamp in the
>>> Flume event headers, but it was null
>>>         at
>>> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
>>>         at
>>> org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:200)
>>>         at
>>> org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:396)
>>>         at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356)
>>>         ... 3 more
>>>
>>>
>>> I fixed my regexp now, still, I can never be sure all the log files the
>>> system creates will be perfect without any bad lines.
>>>
>>>
>>> On Sat, Aug 3, 2013 at 9:56 AM, Connor Woodson 
>>> <[email protected]>wrote:
>>>
>>>> Just some more thoughts. It could be even easier:
>>>>
>>>> The whole set up might be even easier; for the failover sink processor,
>>>> you have those settings "max attempts" and "time between attempts" and it
>>>> will just try one event X times before it gives up and sends it to the next
>>>> sink. The time between events could even backoff if needed.
>>>>
>>>> The advantage of this is that it preserves the ordering of events,
>>>> something which gets completely broken in the previous scenario.
>>>>
>>>> - Connor
>>>>
>>>>
>>>> On Fri, Aug 2, 2013 at 6:27 PM, Connor Woodson 
>>>> <[email protected]>wrote:
>>>>
>>>>> As another option to solve the problem of having a bad event in a
>>>>> channel: using a fail-over sink processor, log all bad events to a local
>>>>> file. And to be extra cautious, add a third failover of a null sink. This
>>>>> will mean that events will always flow through your channel. The file sink
>>>>> should almost never fail, so you shouldn't be losing events in the 
>>>>> process.
>>>>> And then you can re-process everything in the file if you still want those
>>>>> events for something.
>>>>>
>>>>> For the system of having Flume detect bad events, I think implementing
>>>>> something like above is better than discarding events that fail X times.
>>>>> For instance, if you have an Avro sink -> Avro source, and you're
>>>>> restarting your source, Flume would end up discarding events 
>>>>> unnecessarily.
>>>>> Instead, how about implementing the above system and then go a step
>>>>> further: Flume will attempt to re-send the bad events itself. And then if 
>>>>> a
>>>>> bad event isn't able to be sent after X attempts, it is can be discarded.
>>>>>
>>>>> I envision this system as an extension to the current File Channel;
>>>>> when an event fails, it is written to a secondary File Channel from which
>>>>> events can be pulled when the main channel isn't in use. It would add
>>>>> headers like "lastAttempt" and "numberOfAttempts" to events. Then it can 
>>>>> be
>>>>> configurable for a "min time between attempts" and "maximum attempts." 
>>>>> When
>>>>> an event fails the second time, those headers are updated and it goes back
>>>>> into the fail-channel. If it comes out of the fail-channel but the
>>>>> lastAttempt is too recent, it goes back in. If it fails more times than 
>>>>> the
>>>>> maximum, it is written to a final location (perhaps its just sent to
>>>>> another sink; maybe this would have to be in a sink processor). Assuming
>>>>> all of those steps are error-free, then all messages are preserved, and 
>>>>> the
>>>>> badly-formatted eventually get stored somewhere else. (This system could 
>>>>> be
>>>>> hacked together with current code - fail over sink processor -> avro sink
>>>>> -> avro source on same instance, but that's a little too hacky).
>>>>>
>>>>> Just some thoughts.
>>>>>
>>>>> - Connor
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Aug 1, 2013 at 3:25 PM, Arvind Prabhakar <[email protected]>wrote:
>>>>>
>>>>>> This sounds like a critical problem that can cause pipelines to block
>>>>>> permanently. If you find yourself in this situation, a possible work 
>>>>>> around
>>>>>> would be to decommission the channel, remove its data and route the flow
>>>>>> with a new empty channel. If you have the ability to identify which
>>>>>> component is causing the problem and see if you can remove it temporarily
>>>>>> to let the problem events pass through another peer component.
>>>>>>
>>>>>> I have also created FLUME-2140 [1] which will eventually allow the
>>>>>> pipelines to identify and divert such bad events. If you have any logs,
>>>>>> data, configurations that can be shared and will help provide more 
>>>>>> details
>>>>>> for this problem, it will be great if you could attach them to this jira
>>>>>> and provide your comments.
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/FLUME-2140
>>>>>>
>>>>>> Regards,
>>>>>> Arvind Prabhakar
>>>>>>
>>>>>> On Thu, Aug 1, 2013 at 10:33 AM, Paul Chavez <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> **
>>>>>>> There's no way to deal with a bad event once it's in the channel,
>>>>>>> but you can mitigate future issues by having a timestamp interceptor 
>>>>>>> bound
>>>>>>> to the source feeding the channel. There is a parameter 'preserve 
>>>>>>> existing'
>>>>>>> that will only add the header if it doesn't exist. If you don't want to
>>>>>>> have 'bad' time data in there you could try a static interceptor with a
>>>>>>> specific past date so that corrupt events fall into a deterministic 
>>>>>>> path in
>>>>>>> HDFS.
>>>>>>>
>>>>>>> I use this technique to prevent stuck events for both timestamp
>>>>>>> headers as well as some of our own custom headers we use for tokenized
>>>>>>> paths. The static interceptor will insert an arbitrary header if it 
>>>>>>> doesn't
>>>>>>> exist so I have a couple that put in the value 'Unknown' so that I can
>>>>>>> still send the events through the HDFS sink but I can also find them 
>>>>>>> later
>>>>>>> if need be.
>>>>>>>
>>>>>>> hope that helps,
>>>>>>> Paul Chavez
>>>>>>>
>>>>>>>  ------------------------------
>>>>>>> *From:* Roshan Naik [mailto:[email protected]]
>>>>>>> *Sent:* Thursday, August 01, 2013 10:27 AM
>>>>>>> *To:* [email protected]
>>>>>>> *Subject:* Re: Problem Events
>>>>>>>
>>>>>>>  some questions:
>>>>>>> - why is the sink unable to consume the event ?
>>>>>>> - how would you like to identify such an event ? by examining its
>>>>>>> content ? or by the fact that its ping-pong-ing between channel and 
>>>>>>> sink ?
>>>>>>> - what would you prefer to do with such an event ? merely drop it ?
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Aug 1, 2013 at 9:26 AM, Jeremy Karlson <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>>  To my knowledge (which is admittedly limited), there is no way to
>>>>>>>> deal with these in a way that will make your day.  I'm happy if 
>>>>>>>> someone can
>>>>>>>> say otherwise.
>>>>>>>>
>>>>>>>> This is very similar to a problem I had a week or two ago.  I fixed
>>>>>>>> it by restarting Flume with debugging on, connecting to it with the
>>>>>>>> debugger, and finding the message in the sink.  Discover a bug in the 
>>>>>>>> sink.
>>>>>>>>  Downloaded Flume, fixed bug, recompiled, installed custom version, 
>>>>>>>> etc.
>>>>>>>>
>>>>>>>> I agree that this is not a practical solution, and I still believe
>>>>>>>> that Flume needs some sort of "sink of last resort" option or 
>>>>>>>> something,
>>>>>>>> like JMS implementations.
>>>>>>>>
>>>>>>>> -- Jeremy
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Aug 1, 2013 at 2:42 AM, Anat Rozenzon <[email protected]>wrote:
>>>>>>>>
>>>>>>>>>  The message is already in the channel.
>>>>>>>>> Is there a way to write an interceptor to work after the channel?
>>>>>>>>> or before the sink?
>>>>>>>>>
>>>>>>>>> The only thing I found is to stop everything and delete the
>>>>>>>>> channel files, but I won't be able to use this approach in production 
>>>>>>>>> :-(
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Aug 1, 2013 at 11:13 AM, Ashish 
>>>>>>>>> <[email protected]>wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>  On Thu, Aug 1, 2013 at 1:29 PM, Anat Rozenzon <[email protected]>wrote:
>>>>>>>>>>
>>>>>>>>>>>   Hi,
>>>>>>>>>>>
>>>>>>>>>>> I'm having the same problem with HDFS sink.
>>>>>>>>>>>
>>>>>>>>>>> A 'poison' message which doesn't have timestamp header in it as
>>>>>>>>>>> the sink expects.
>>>>>>>>>>> This causes a NPE which ends in returning the message to the
>>>>>>>>>>> channel , over and over again.
>>>>>>>>>>>
>>>>>>>>>>> Is my only option to re-write the HDFS sink?
>>>>>>>>>>> Isn't there any way to intercept in the sink work?
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> You can write a custom interceptor and remove/modify the poison
>>>>>>>>>> message.
>>>>>>>>>>
>>>>>>>>>> Interceptors are called before message makes it way into the
>>>>>>>>>> channel.
>>>>>>>>>>
>>>>>>>>>> http://flume.apache.org/FlumeUserGuide.html#flume-interceptors
>>>>>>>>>>
>>>>>>>>>> I wrote a blog about it a while back
>>>>>>>>>> http://www.ashishpaliwal.com/blog/2013/06/flume-cookbook-implementing-custom-interceptors/
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Anat
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jul 26, 2013 at 3:35 AM, Arvind Prabhakar <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Sounds like a bug in ElasticSearch sink to me. Do you mind
>>>>>>>>>>>> filing a Jira to track this? Sample data to cause this would be 
>>>>>>>>>>>> even
>>>>>>>>>>>> better.
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> Arvind Prabhakar
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Jul 25, 2013 at 9:50 AM, Jeremy Karlson <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> This was using the provided ElasticSearch sink.  The logs were
>>>>>>>>>>>>> not helpful.  I ran it through with the debugger and found the 
>>>>>>>>>>>>> source of
>>>>>>>>>>>>> the problem.
>>>>>>>>>>>>>
>>>>>>>>>>>>> ContentBuilderUtil uses a very "aggressive" method to
>>>>>>>>>>>>> determine if the content is JSON; if it contains a "{" anywhere 
>>>>>>>>>>>>> in it, it's
>>>>>>>>>>>>> considered JSON.  My body contained that but wasn't JSON, causing 
>>>>>>>>>>>>> the JSON
>>>>>>>>>>>>> parser to throw a CharConversionException from 
>>>>>>>>>>>>> addComplexField(...) (but
>>>>>>>>>>>>> not the expected JSONException).  We've changed 
>>>>>>>>>>>>> addComplexField(...) to
>>>>>>>>>>>>> catch different types of exceptions and fall back to treating it 
>>>>>>>>>>>>> as a
>>>>>>>>>>>>> simple field.  We'll probably submit a patch for this soon.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm reasonably happy with this, but I still think that in the
>>>>>>>>>>>>> bigger picture there should be some sort of mechanism to 
>>>>>>>>>>>>> automatically
>>>>>>>>>>>>> detect and toss / skip / flag problematic events without them 
>>>>>>>>>>>>> plugging up
>>>>>>>>>>>>> the flow.
>>>>>>>>>>>>>
>>>>>>>>>>>>> -- Jeremy
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Jul 24, 2013 at 7:51 PM, Arvind Prabhakar <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Jeremy, would it be possible for you to show us logs for the
>>>>>>>>>>>>>> part where the sink fails to remove an event from the channel? I 
>>>>>>>>>>>>>> am
>>>>>>>>>>>>>> assuming this is a standard sink that Flume provides and not a 
>>>>>>>>>>>>>> custom one.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The reason I ask is because sinks do not introspect the
>>>>>>>>>>>>>> event, and hence there is no reason why it will fail during the 
>>>>>>>>>>>>>> event's
>>>>>>>>>>>>>> removal. It is more likely that there is a problem within the 
>>>>>>>>>>>>>> channel in
>>>>>>>>>>>>>> that it cannot dereference the event correctly. Looking at the 
>>>>>>>>>>>>>> logs will
>>>>>>>>>>>>>> help us identify the root cause for what you are experiencing.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>> Arvind Prabhakar
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Jul 24, 2013 at 3:56 PM, Jeremy Karlson <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>  Both reasonable suggestions.  What would a custom sink
>>>>>>>>>>>>>>> look like in this case, and how would I only eliminate the 
>>>>>>>>>>>>>>> problem events
>>>>>>>>>>>>>>> since I don't know what they are until they are attempted by 
>>>>>>>>>>>>>>> the "real"
>>>>>>>>>>>>>>> sink?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> My philosophical concern (in general) is that we're taking
>>>>>>>>>>>>>>> the approach of exhaustively finding and eliminating possible 
>>>>>>>>>>>>>>> failure
>>>>>>>>>>>>>>> cases.  It's not possible to eliminate every single failure 
>>>>>>>>>>>>>>> case, so
>>>>>>>>>>>>>>> shouldn't there be a method of last resort to eliminate problem 
>>>>>>>>>>>>>>> events from
>>>>>>>>>>>>>>> the channel?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> -- Jeremy
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Jul 24, 2013 at 3:45 PM, Hari Shreedharan <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Or you could write a custom sink that removes this event
>>>>>>>>>>>>>>>> (more work of course)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>> Hari
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>  On Wednesday, July 24, 2013 at 3:36 PM, Roshan Naik wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>   if you have a way to identify such events.. you may be
>>>>>>>>>>>>>>>> able to use the Regex interceptor to toss them out before they 
>>>>>>>>>>>>>>>> get into the
>>>>>>>>>>>>>>>> channel.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, Jul 24, 2013 at 2:52 PM, Jeremy Karlson <
>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>  Hi everyone.  My Flume adventures continue.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I'm in a situation now where I have a channel that's
>>>>>>>>>>>>>>>> filling because a stubborn message is stuck.  The sink won't 
>>>>>>>>>>>>>>>> accept it (for
>>>>>>>>>>>>>>>> whatever reason; I can go into detail but that's not my point 
>>>>>>>>>>>>>>>> here).  This
>>>>>>>>>>>>>>>> just blocks up the channel entirely, because it goes back into 
>>>>>>>>>>>>>>>> the channel
>>>>>>>>>>>>>>>> when the sink refuses.  Obviously, this isn't ideal.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I'm wondering what mechanisms, if any, Flume has to deal
>>>>>>>>>>>>>>>> with these situations.  Things that come to mind might be:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 1. Ditch the event after n attempts.
>>>>>>>>>>>>>>>> 2. After n attempts, send the event to a "problem area"
>>>>>>>>>>>>>>>> (maybe a different source / sink / channel?)  that someone can 
>>>>>>>>>>>>>>>> look at
>>>>>>>>>>>>>>>> later.
>>>>>>>>>>>>>>>> 3. Some sort of mechanism that allows operators to manually
>>>>>>>>>>>>>>>> kill these messages.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I'm open to suggestions on alternatives as well.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> -- Jeremy
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> thanks
>>>>>>>>>> ashish
>>>>>>>>>>
>>>>>>>>>> Blog: http://www.ashishpaliwal.com/blog
>>>>>>>>>> My Photo Galleries: http://www.pbase.com/ashishpaliwal
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to