OK, Thank you very much. 2015-04-18 3:26 GMT+08:00 Gwen Shapira <[email protected]>:
> This looks like the right design to me. > > On Fri, Apr 17, 2015 at 12:22 PM, Tao Li <[email protected]> wrote: > > Why I design like this, it's on the follwing thoughts: > > I want to take "KafkaChanel1 => Kafka Cluster => KafkaChannel2" as one > > channel. So it will simply be ScribeSouce put events to it, and > > HDFSEventSink take events from it. The kafka cluster provide a stable > > storage, and be transparent on events delivery between source and sink. > (If > > I use a "KafkaSource=>MemoryChannel=>HDFSEventSink" to export data from > > kafka to hdfs, the memory isn unstable, and not transparent) > > So the work flow is simply like this: > > ScribeClient =>ScribeSource => KafkaChannel(distributed) => > HDFSEventSink => > > HDFS > > > > As Interceptor is following the source, so maybe I should add the filter > > interceptor after ScribeSource, like this: > > ScribeClient =>ScribeSource => FilterInterceptor => > > KafkaChannel(distributed) => HDFSEventSink => HDFS > > > > 2015-04-18 2:51 GMT+08:00 Tao Li <[email protected]>: > >> > >> @Gwen @Hari > >> > >> My use case is as follows: > >> ScribeClient => [Agent1: ScribeSource => KafkaChannel1] => Kafka Cluster > >> => [Agent2: KafkaCluster2 => HDFSEventSink] => HDFS > >> > >> The bad case is as follows: > >> My HDFSEventSink need a header "timestamp", but some dirty data(by > >> mistake) in Kafka doesn't has the "timestamp" headers, which cause the > >> following BucketPath.escapeString thows NullPointerException. > >> String realPath = BucketPath.escapeString(filePath, event.getHeaders(), > >> timeZone, needRounding, roundUnit, roundValue, useLocalTime); > >> > >> I think Gwen's second point is OK, we can add a interceptor to do the > >> filter job. > >> > >> But my flume agent is kind of special: > >> For Agent1, doesn't have sink, directly send message to kafak cluster by > >> KafkaChannel1. > >> For Agent2, doesn't have source, directly poll event from kafka cluster > by > >> KafkaChannel2. > >> Agent1 and Agent2 is different JVM and deploy on different node. > >> > >> I don't know if it's reasonable for a agent with no sink or no source? > But > >> I have already build the whold work flow, and it's works well for me for > >> regular cases. > >> > >> For Agent2, because of without source, so I can't use Gwen's Interceptor > >> suggestion. > >> > >> 2015-04-18 2:30 GMT+08:00 Hari Shreedharan <[email protected]>: > >>> > >>> What I think he means is that a message in the channel that cannot be > >>> serialized by the serializer because it is malformed causing the > serializer > >>> to fail and perhaps throw (think malformed Avro). Such a message > basically > >>> would be stuck in an infinite loop. So the workaround in (2) would > work if > >>> using a Kafka Source. > >>> > >>> Thanks, > >>> Hari > >>> > >>> > >>> On Fri, Apr 17, 2015 at 10:08 AM, Tao Li <[email protected]> > wrote: > >>>> > >>>> OK, I got it, Thanks. > >>>> > >>>> 2015-04-18 0:59 GMT+08:00 Hari Shreedharan <[email protected] > >: > >>>>> > >>>>> Are you using Kafka channel? The fix I mentioned was for file > channel. > >>>>> Unfortunately, we don't plan to introduce something that drops data > in real > >>>>> time. This makes it too easy for a misconfig to cause data loss. > You'd have > >>>>> to ensure the data in the Kafka channel is valid. > >>>>> > >>>>> Thanks, > >>>>> Hari > >>>>> > >>>>> > >>>>> On Fri, Apr 17, 2015 at 9:41 AM, Tao Li <[email protected]> > >>>>> wrote: > >>>>>> > >>>>>> @Hari, you mean I need to ensure the data in kafka is OK by myself, > >>>>>> right? > >>>>>> > >>>>>> How about we have a config to let user decide how to handle BACKOFF. > >>>>>> For example, we can config the max retry num in process(), and also > >>>>>> config wether commit or not when exceed the max retry num.(In my > kafka case, > >>>>>> when meet dirty data, commit the comsume offset will be nice for me > than > >>>>>> endless loop) > >>>>>> > >>>>>> 2015-04-18 0:23 GMT+08:00 Hari Shreedharan > >>>>>> <[email protected]>: > >>>>>>> > >>>>>>> We recently added functionality to the file channel integrity tool > >>>>>>> that can be used to remove bad events from the channel - though > you would > >>>>>>> need to write some code to validate events. It will be in the soon > to be > >>>>>>> released 1.6.0 > >>>>>>> > >>>>>>> Thanks, > >>>>>>> Hari > >>>>>>> > >>>>>>> > >>>>>>> On Fri, Apr 17, 2015 at 9:05 AM, Tao Li <[email protected]> > >>>>>>> wrote: > >>>>>>>> > >>>>>>>> Hi all: > >>>>>>>> > >>>>>>>> My use case is KafkaChannel + HDFSEventSink. > >>>>>>>> > >>>>>>>> I found that SinkRunner.PollingRunner will call > >>>>>>>> HDFSEventSink.process() in a while loop. For example, a message > in kafka > >>>>>>>> contains dirty data, so HDFSEventSink.process() consume message > from kafka, > >>>>>>>> throws exception because of dirty data, and kafka offset doesn't > commit. And > >>>>>>>> the outer loop, will continue call HDFSEventSink.process(). > Because the > >>>>>>>> kafka offset doesn't change, so HDFSEventSink will consume the > dirty data > >>>>>>>> again. The bad loop is never stopped. > >>>>>>>> > >>>>>>>> I want to know that if we have a mechanism to cover this case? For > >>>>>>>> example, we have a max retry num for a unique > HDFSEventSink.process() call > >>>>>>>> and give up when exceed max limit. > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > > >
