Hi Gonzalo,
> It should work as you say. > > I wonder how do you know the events are "empty", do you get new lines > in the console consumer? Yes, the consumer shows new lines. > Also, the example payload you show looks like avro but not the > standard FlumeEvent, can you show us your agent configuration Yes. The agent configuration is: postal.sources = srcdir postal.channels = memch memch-sender memch-authuser ############## ### source ### ############## ## srcdir ## # Describe/configure the Source srcdir postal.sources.srcdir.type = spooldir postal.sources.srcdir.spoolDir = /var/log/postal/source/rout/ postal.sources.srcdir.fileSuffix = .COMPLETED postal.sources.srcdir.trackerDir = .flumespool postal.sources.srcdir.consumeOrder = oldest postal.sources.srcdir.maxBackoff = 4000 postal.sources.srcdir.batchSize = 100 postal.sources.srcdir.inputCharset = ISO8859-1 postal.sources.srcdir.deserializer = LINE postal.sources.srcdir.deserializer.maxLineLength = 10000 postal.sources.srcdir.deserializer.outputCharset = UTF-8 postal.sources.srcdir.interceptors = logDeserial-morph # interceptor postal.sources.srcdir.interceptors.logDeserial-morph.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder postal.sources.srcdir.interceptors.logDeserial-morph.morphlineFile = /opt/sds/ingestion/conf/morph.conf postal.sources.srcdir.interceptors.logDeserial-morph.morphlineId = morphline # Channel selector configuration postal.sources.srcdir.selector.type = multiplexing postal.sources.srcdir.selector.header = event_route postal.sources.srcdir.selector.mapping.SENDER = memch memch-sender postal.sources.srcdir.selector.mapping.AUTHUSER = memch memch-authuser postal.sources.srcdir.selector.default = memch ############### ### channel ### ############### ## memch ## postal.channels.memch.type = org.apache.flume.channel.kafka.KafkaChannel postal.channels.memch.transactionCapacity = 100 postal.channels.memch.kafka.bootstrap.servers = localhost:9092 postal.channels.memch.kafka.topic = messages postal.channels.memch.parseAsFlumeEvent = false postal.channels.memch.kafka.consumer.auto.offset.reset = latest postal.channels.memch.kafka.consumer.group.id = cg-messages ## memch-sender ## postal.channels.memch-sender.type = org.apache.flume.channel.kafka.KafkaChannel postal.channels.memch-sender.transactionCapacity = 100 postal.channels.memch-sender.kafka.bootstrap.servers = localhost:9092 postal.channels.memch-sender.kafka.topic = sender postal.channels.memch-sender.parseAsFlumeEvent = false postal.channels.memch-sender.kafka.consumer.auto.offset.reset = latest postal.channels.memch-sender.kafka.consumer.groupId = cg-sender ## memch-authuser ## postal.channels.memch-authuser.type = org.apache.flume.channel.kafka.KafkaChannel postal.channels.memch-authuser.transactionCapacity = 100 postal.channels.memch-authuser.kafka.bootstrap.servers = localhost:9092 postal.channels.memch-authuser.kafka.topic = authuser postal.channels.memch-authuser.parseAsFlumeEvent = false postal.channels.memch-authuser.kafka.consumer.auto.offset.reset = latest postal.channels.memch-authuser.kafka.consumer.groupId = cg-authuser ############### ### Mapping ### ############### postal.sources.srcdir.channels = memch memch-sender memch-authuser Do you need 'morph.conf'? Regards. > > Gonzalo
