Why I design like this, it's on the follwing thoughts: I want to take "KafkaChanel1 => Kafka Cluster => KafkaChannel2" as one channel. So it will simply be ScribeSouce put events to it, and HDFSEventSink take events from it. The kafka cluster provide a stable storage, and be transparent on events delivery between source and sink. (If I use a "KafkaSource=>MemoryChannel=>HDFSEventSink" to export data from kafka to hdfs, the memory isn unstable, and not transparent) So the work flow is simply like this: ScribeClient =>ScribeSource => KafkaChannel(distributed) => HDFSEventSink => HDFS
As Interceptor is following the source, *so maybe I should add the filter interceptor after ScribeSource*, like this: ScribeClient =>ScribeSource => *FilterInterceptor* => KafkaChannel(distributed) => HDFSEventSink => HDFS 2015-04-18 2:51 GMT+08:00 Tao Li <[email protected]>: > @Gwen @Hari > > My use case is as follows: > ScribeClient => [Agent1: ScribeSource => KafkaChannel1] => Kafka Cluster > => [Agent2: KafkaCluster2 => HDFSEventSink] => HDFS > > The bad case is as follows: > My HDFSEventSink need a header "*timestamp*", but some dirty data(by > mistake) in Kafka doesn't has the "timestamp" headers, which cause the > following BucketPath.escapeString thows *NullPointerException*. > String realPath = BucketPath.escapeString(filePath, event.getHeaders(), > timeZone, needRounding, roundUnit, roundValue, useLocalTime); > > *I think Gwen's second point is OK, we can add a interceptor to do the > filter job.* > > But my flume agent is kind of special: > For Agent1, doesn't have sink, directly send message to kafak cluster by > KafkaChannel1. > For Agent2, doesn't have source, directly poll event from kafka cluster by > KafkaChannel2. > Agent1 and Agent2 is different JVM and deploy on different node. > > *I don't know if it's reasonable for a agent with no sink or no source?* But > I have already build the whold work flow, and it's works well for me for > regular cases. > > *For Agent2, because of without source, so I can't use Gwen's Interceptor > suggestion.* > > 2015-04-18 2:30 GMT+08:00 Hari Shreedharan <[email protected]>: > >> What I think he means is that a message in the channel that cannot be >> serialized by the serializer because it is malformed causing the serializer >> to fail and perhaps throw (think malformed Avro). Such a message basically >> would be stuck in an infinite loop. So the workaround in (2) would work if >> using a Kafka Source. >> >> Thanks, >> Hari >> >> >> On Fri, Apr 17, 2015 at 10:08 AM, Tao Li <[email protected]> wrote: >> >>> OK, I got it, Thanks. >>> >>> 2015-04-18 0:59 GMT+08:00 Hari Shreedharan <[email protected]>: >>> >>>> Are you using Kafka channel? The fix I mentioned was for file channel. >>>> Unfortunately, we don't plan to introduce something that drops data in real >>>> time. This makes it too easy for a misconfig to cause data loss. You'd have >>>> to ensure the data in the Kafka channel is valid. >>>> >>>> Thanks, >>>> Hari >>>> >>>> >>>> On Fri, Apr 17, 2015 at 9:41 AM, Tao Li <[email protected]> >>>> wrote: >>>> >>>>> @Hari, you mean I need to ensure the data in kafka is OK by myself, >>>>> right? >>>>> >>>>> How about we have a config to let user decide how to handle BACKOFF. >>>>> For example, we can config the max retry num in process(), and also >>>>> config wether commit or not when exceed the max retry num.(In my kafka >>>>> case, when meet dirty data, commit the comsume offset will be nice for me >>>>> than endless loop) >>>>> >>>>> 2015-04-18 0:23 GMT+08:00 Hari Shreedharan <[email protected]> >>>>> : >>>>> >>>>>> We recently added functionality to the file channel integrity tool >>>>>> that can be used to remove bad events from the channel - though you would >>>>>> need to write some code to validate events. It will be in the soon to be >>>>>> released 1.6.0 >>>>>> >>>>>> Thanks, >>>>>> Hari >>>>>> >>>>>> >>>>>> On Fri, Apr 17, 2015 at 9:05 AM, Tao Li <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi all: >>>>>>> >>>>>>> My use case is KafkaChannel + HDFSEventSink. >>>>>>> >>>>>>> I found that SinkRunner.PollingRunner will call >>>>>>> HDFSEventSink.process() in a while loop. For example, a message in kafka >>>>>>> contains dirty data, so HDFSEventSink.process() consume message from >>>>>>> kafka, >>>>>>> throws exception because of *dirty data*, and *kafka offset doesn't >>>>>>> commit*. And the outer loop, will continue call >>>>>>> HDFSEventSink.process(). Because the kafka offset doesn't change, so >>>>>>> HDFSEventSink will consume the dirty data *again*. The bad loop is >>>>>>> *never >>>>>>> stopped*. >>>>>>> >>>>>>> *I want to know that if we have a **mechanism to cover this case?* >>>>>>> For example, we have a max retry num for a unique >>>>>>> HDFSEventSink.process() >>>>>>> call and give up when exceed max limit. >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
