@Gwen @Hari

My use case is as follows:
ScribeClient => [Agent1: ScribeSource => KafkaChannel1] => Kafka Cluster =>
[Agent2: KafkaCluster2 => HDFSEventSink] => HDFS

The bad case is as follows:
My HDFSEventSink need a header "*timestamp*", but some dirty data(by
mistake) in Kafka doesn't has the "timestamp" headers, which cause the
following BucketPath.escapeString thows *NullPointerException*.
String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
timeZone, needRounding, roundUnit, roundValue, useLocalTime);

*I think Gwen's second point is OK, we can add a interceptor to do the
filter job.*

But my flume agent is kind of special:
For Agent1, doesn't have sink, directly send message to kafak cluster by
KafkaChannel1.
For Agent2, doesn't have source, directly poll event from kafka cluster by
KafkaChannel2.
Agent1 and Agent2 is different JVM and deploy on different node.

*I don't know if it's reasonable for a agent with no sink or no source?* But
I have already build the whold work flow, and it's works well for me for
regular cases.

*For Agent2, because of without source, so I can't use Gwen's Interceptor
suggestion.*

2015-04-18 2:30 GMT+08:00 Hari Shreedharan <[email protected]>:

> What I think he means is that a message in the channel that cannot be
> serialized by the serializer because it is malformed causing the serializer
> to fail and perhaps throw (think malformed Avro). Such a message basically
> would be stuck in an infinite loop. So the workaround in (2) would work if
> using a Kafka Source.
>
> Thanks,
> Hari
>
>
> On Fri, Apr 17, 2015 at 10:08 AM, Tao Li <[email protected]> wrote:
>
>> OK, I got it, Thanks.
>>
>> 2015-04-18 0:59 GMT+08:00 Hari Shreedharan <[email protected]>:
>>
>>> Are you using Kafka channel? The fix I mentioned was for file channel.
>>> Unfortunately, we don't plan to introduce something that drops data in real
>>> time. This makes it too easy for a misconfig to cause data loss. You'd have
>>> to ensure the data in the Kafka channel is valid.
>>>
>>> Thanks,
>>> Hari
>>>
>>>
>>> On Fri, Apr 17, 2015 at 9:41 AM, Tao Li <[email protected]> wrote:
>>>
>>>> @Hari, you mean I need to ensure the data in kafka is OK by myself,
>>>> right?
>>>>
>>>> How about we have a config to let user decide how to handle BACKOFF.
>>>>  For example, we can config the max retry num in process(), and also
>>>> config wether commit or not when exceed the max retry num.(In my kafka
>>>> case, when meet dirty data, commit the comsume offset will be nice for me
>>>> than endless loop)
>>>>
>>>> 2015-04-18 0:23 GMT+08:00 Hari Shreedharan <[email protected]>:
>>>>
>>>>> We recently added functionality to the file channel integrity tool
>>>>> that can be used to remove bad events from the channel - though you would
>>>>> need to write some code to validate events. It will be in the soon to be
>>>>> released 1.6.0
>>>>>
>>>>> Thanks,
>>>>> Hari
>>>>>
>>>>>
>>>>> On Fri, Apr 17, 2015 at 9:05 AM, Tao Li <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi all:
>>>>>>
>>>>>> My use case is KafkaChannel + HDFSEventSink.
>>>>>>
>>>>>> I found that SinkRunner.PollingRunner will call
>>>>>> HDFSEventSink.process() in a while loop. For example, a message in kafka
>>>>>> contains dirty data, so HDFSEventSink.process() consume message from 
>>>>>> kafka,
>>>>>> throws exception because of *dirty data*, and *kafka offset doesn't
>>>>>> commit*. And the outer loop, will continue call
>>>>>> HDFSEventSink.process(). Because the kafka offset doesn't change, so
>>>>>> HDFSEventSink will consume the dirty data *again*. The bad loop is *never
>>>>>> stopped*.
>>>>>>
>>>>>>  *I want to know that if we have a **mechanism to cover this case?*
>>>>>> For example, we have a max retry num for a unique HDFSEventSink.process()
>>>>>> call and give up when exceed max limit.
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to