What I think he means is that a message in the channel that cannot be 
serialized by the serializer because it is malformed causing the serializer to 
fail and perhaps throw (think malformed Avro). Such a message basically would 
be stuck in an infinite loop. So the workaround in (2) would work if using a 
Kafka Source. 



Thanks, Hari

On Fri, Apr 17, 2015 at 10:08 AM, Tao Li <[email protected]> wrote:

> OK, I got it, Thanks.
> 2015-04-18 0:59 GMT+08:00 Hari Shreedharan <[email protected]>:
>> Are you using Kafka channel? The fix I mentioned was for file channel.
>> Unfortunately, we don't plan to introduce something that drops data in real
>> time. This makes it too easy for a misconfig to cause data loss. You'd have
>> to ensure the data in the Kafka channel is valid.
>>
>> Thanks,
>> Hari
>>
>>
>> On Fri, Apr 17, 2015 at 9:41 AM, Tao Li <[email protected]> wrote:
>>
>>> @Hari, you mean I need to ensure the data in kafka is OK by myself,
>>> right?
>>>
>>> How about we have a config to let user decide how to handle BACKOFF.
>>>  For example, we can config the max retry num in process(), and also
>>> config wether commit or not when exceed the max retry num.(In my kafka
>>> case, when meet dirty data, commit the comsume offset will be nice for me
>>> than endless loop)
>>>
>>> 2015-04-18 0:23 GMT+08:00 Hari Shreedharan <[email protected]>:
>>>
>>>> We recently added functionality to the file channel integrity tool that
>>>> can be used to remove bad events from the channel - though you would need
>>>> to write some code to validate events. It will be in the soon to be
>>>> released 1.6.0
>>>>
>>>> Thanks,
>>>> Hari
>>>>
>>>>
>>>> On Fri, Apr 17, 2015 at 9:05 AM, Tao Li <[email protected]> wrote:
>>>>
>>>>> Hi all:
>>>>>
>>>>> My use case is KafkaChannel + HDFSEventSink.
>>>>>
>>>>> I found that SinkRunner.PollingRunner will call HDFSEventSink.process()
>>>>> in a while loop. For example, a message in kafka contains dirty data, so
>>>>> HDFSEventSink.process() consume message from kafka, throws exception
>>>>> because of *dirty data*, and *kafka offset doesn't commit*. And the
>>>>> outer loop, will continue call HDFSEventSink.process(). Because the kafka
>>>>> offset doesn't change, so HDFSEventSink will consume the dirty data
>>>>> *again*. The bad loop is *never stopped*.
>>>>>
>>>>>  *I want to know that if we have a **mechanism to cover this case?*
>>>>> For example, we have a max retry num for a unique HDFSEventSink.process()
>>>>> call and give up when exceed max limit.
>>>>>
>>>>>
>>>>
>>>
>>

Reply via email to