1. Can you be more specific about "dirty data" that prevents the sink
from pulling events out of the channel? AFAIK, Kafka doesn't care what
data you put in there, and neither does the HDFS sink.

2. Perhaps a work-around can be:
Kafka Source -> interceptor that validates the data is clean and drops
dirty data -> Kafka channel (another topic) -> HDFS Sink

(Assuming the KafkaSource manages to get the dirty data out of Kafka)

On Fri, Apr 17, 2015 at 10:08 AM, Tao Li <[email protected]> wrote:
> OK, I got it, Thanks.
>
> 2015-04-18 0:59 GMT+08:00 Hari Shreedharan <[email protected]>:
>>
>> Are you using Kafka channel? The fix I mentioned was for file channel.
>> Unfortunately, we don't plan to introduce something that drops data in real
>> time. This makes it too easy for a misconfig to cause data loss. You'd have
>> to ensure the data in the Kafka channel is valid.
>>
>> Thanks,
>> Hari
>>
>>
>> On Fri, Apr 17, 2015 at 9:41 AM, Tao Li <[email protected]> wrote:
>>>
>>> @Hari, you mean I need to ensure the data in kafka is OK by myself,
>>> right?
>>>
>>> How about we have a config to let user decide how to handle BACKOFF.
>>> For example, we can config the max retry num in process(), and also
>>> config wether commit or not when exceed the max retry num.(In my kafka case,
>>> when meet dirty data, commit the comsume offset will be nice for me than
>>> endless loop)
>>>
>>> 2015-04-18 0:23 GMT+08:00 Hari Shreedharan <[email protected]>:
>>>>
>>>> We recently added functionality to the file channel integrity tool that
>>>> can be used to remove bad events from the channel - though you would need 
>>>> to
>>>> write some code to validate events. It will be in the soon to be released
>>>> 1.6.0
>>>>
>>>> Thanks,
>>>> Hari
>>>>
>>>>
>>>> On Fri, Apr 17, 2015 at 9:05 AM, Tao Li <[email protected]> wrote:
>>>>>
>>>>> Hi all:
>>>>>
>>>>> My use case is KafkaChannel + HDFSEventSink.
>>>>>
>>>>> I found that SinkRunner.PollingRunner will call HDFSEventSink.process()
>>>>> in a while loop. For example, a message in kafka contains dirty data, so
>>>>> HDFSEventSink.process() consume message from kafka, throws exception 
>>>>> because
>>>>> of dirty data, and kafka offset doesn't commit. And the outer loop, will
>>>>> continue call HDFSEventSink.process(). Because the kafka offset doesn't
>>>>> change, so HDFSEventSink will consume the dirty data again. The bad loop 
>>>>> is
>>>>> never stopped.
>>>>>
>>>>> I want to know that if we have a mechanism to cover this case? For
>>>>> example, we have a max retry num for a unique HDFSEventSink.process() call
>>>>> and give up when exceed max limit.
>>>>>
>>>>
>>>
>>
>

Reply via email to