Hi Alex, Unfortunately interceptors can only be applied to the sources. At this time, we do not have support for sink-side interceptors. There is FLUME-1207 that tracks this request.
https://issues.apache.org/jira/browse/FLUME-1207 Regards, Arvind Prabhakar On Wed, Oct 2, 2013 at 4:45 AM, Alexandru Sicoe <[email protected]> wrote: > Hello everyone, > > My setup is the following: I am pulling xml messages from RabbitMQ via a > RabbitMQ Flume Source. Attached to this Source is an Interceptor which > parses the xml into csv and eventually the csv message is dumped to a csv > file by a the Sync. Both the Interceptor and Sync are my own custom > implementations. > > A simple sketch of this would be: > rabbitmq -> source -> interceptor -> channel -> sync -> file.csv > > This works fine! > > Now I need to figure how to also dump the raw xml content to an xml file > as well as the parsed csv content. > > I have devised several methods to achieve this. I would like some advice > on whether the methods are possible and which one is the best? > > 1. Pulling the xml twice from RabbitMQ > > rabbitmq -> source1 -> interceptor -> channel -> sync -> file.csv > -> source2 -> channel -> sync -> file.xml > > 2. Pulling the xml once but generate both a csv and an xml from the > Interceptor. > > rabbitmq -> source1 -> interceptor -> channel -> sync -> file.csv > -> channel -> sync -> > file.xml > > 3. Pulling the xml once and having a fan out source and an Interceptor > before the sync. > > rabbitmq -> source1 -> channel -> interceptor -> sync -> file.csv > -> channel -> sync -> file.xml > > In my opinion option 3 would be the best since it doesn't require pulling > the xml twice from RabbitMQ and doesn't require any change in the code that > I wrote. The problem is that I'm not sure it is possible. I tried the > following config file without success: > > agent1.sources = rabbitmq-source1 > agent1.channels = memch1 > agent1.sinks = Console > > agent1.sources.rabbitmq-source1.channels = memch1 > agent1.sources.rabbitmq-source1.type = > org.apache.flume.source.rabbitmq.RabbitMQSource > agent1.sources.rabbitmq-source1.hostname = localhost > agent1.sources.rabbitmq-source1.queuename = hello > > agent1.sinks.Console.interceptors = interceptor1 > agent1.sinks.Console.interceptors.interceptor1.type = > resilient.flume.MyInterceptor$Builder > agent1.sinks.Console.channel = memch1 > agent1.sinks.Console.type = logger > > agent1.channels.memch1.type = memory > > Am I doing something wrong? or is option 3 not possible at all? > > Thanks, > Alex >
