What I think he means is that a message in the channel that cannot be serialized by the serializer because it is malformed causing the serializer to fail and perhaps throw (think malformed Avro). Such a message basically would be stuck in an infinite loop. So the workaround in (2) would work if using a Kafka Source.
Thanks, Hari On Fri, Apr 17, 2015 at 10:08 AM, Tao Li <[email protected]> wrote: > OK, I got it, Thanks. > 2015-04-18 0:59 GMT+08:00 Hari Shreedharan <[email protected]>: >> Are you using Kafka channel? The fix I mentioned was for file channel. >> Unfortunately, we don't plan to introduce something that drops data in real >> time. This makes it too easy for a misconfig to cause data loss. You'd have >> to ensure the data in the Kafka channel is valid. >> >> Thanks, >> Hari >> >> >> On Fri, Apr 17, 2015 at 9:41 AM, Tao Li <[email protected]> wrote: >> >>> @Hari, you mean I need to ensure the data in kafka is OK by myself, >>> right? >>> >>> How about we have a config to let user decide how to handle BACKOFF. >>> For example, we can config the max retry num in process(), and also >>> config wether commit or not when exceed the max retry num.(In my kafka >>> case, when meet dirty data, commit the comsume offset will be nice for me >>> than endless loop) >>> >>> 2015-04-18 0:23 GMT+08:00 Hari Shreedharan <[email protected]>: >>> >>>> We recently added functionality to the file channel integrity tool that >>>> can be used to remove bad events from the channel - though you would need >>>> to write some code to validate events. It will be in the soon to be >>>> released 1.6.0 >>>> >>>> Thanks, >>>> Hari >>>> >>>> >>>> On Fri, Apr 17, 2015 at 9:05 AM, Tao Li <[email protected]> wrote: >>>> >>>>> Hi all: >>>>> >>>>> My use case is KafkaChannel + HDFSEventSink. >>>>> >>>>> I found that SinkRunner.PollingRunner will call HDFSEventSink.process() >>>>> in a while loop. For example, a message in kafka contains dirty data, so >>>>> HDFSEventSink.process() consume message from kafka, throws exception >>>>> because of *dirty data*, and *kafka offset doesn't commit*. And the >>>>> outer loop, will continue call HDFSEventSink.process(). Because the kafka >>>>> offset doesn't change, so HDFSEventSink will consume the dirty data >>>>> *again*. The bad loop is *never stopped*. >>>>> >>>>> *I want to know that if we have a **mechanism to cover this case?* >>>>> For example, we have a max retry num for a unique HDFSEventSink.process() >>>>> call and give up when exceed max limit. >>>>> >>>>> >>>> >>> >>
