flume.conf (Cleaned up for security and making it less verbose by removing additional sources,channel,sinks)
# Name the components on this agent collector.sources = Source1 collector.channels = HdfsChannel KafkaChannel collector.sinks = HdfsSink KafkaSink # Describe/configure the source AvroSource collector.sources.CustomSource.type = com.flume.CustomSource collector.sources.CustomSource.channels = HdfsChannel KafkaChannel collector.sources.CustomSource.bind = 0.0.0.0 collector.sources.CustomSource.port = 9898 collector.sources.CustomSource.schemaFolder = /usr/lib/flume-ng/schemas collector.sources.CustomSource.selector.type = multiplexing collector.sources.CustomSource.selector.header = recordType # required channel mapings collector.sources.CustomSource.selector.mapping.MyRecord = HdfsChannel # optional channel mapings collector.sources.CustomSource.selector.optional.MyRecord = KafkaChannel # HdfsChannel channel config collector.channels.HdfsChannel.type = file collector.channels.HdfsChannel.useDualCheckpoints = true collector.channels.HdfsChannel.checkpointDir = /mnt/persistent/0/flume-ng-data/Hdfsdata/checkpoint collector.channels.HdfsChannel.backupCheckpointDir = /mnt/persistent/0/flume-ng-data/Hdfsdata/backup-checkpoint collector.channels.HdfsChannel.dataDirs = /mnt/persistent/0/flume-ng-data/Hdfsdata/logs collector.channels.HdfsChannel.capacity = 1000000 collector.channels.HdfsChannel.transactionCapacity = 50000 collector.channels.HdfsChannel.write-timeout = 60 collector.channels.HdfsChannel.keep-alive = 30 # HdfsSink sink config collector.sinks.HdfsSink.type = hdfs collector.sinks.HdfsSink.channel = HdfsChannel collector.sinks.HdfsSink.hdfs.fileType = DataStream collector.sinks.HdfsSink.serializer = CUstomSerializer collector.sinks.HdfsSink.serializer.schemaFolder = /usr/lib/flume-ng/schemas collector.sinks.HdfsSink.serializer.syncIntervalBytes = 4096000 collector.sinks.HdfsSink.serializer.compressionCodec = snappy collector.sinks.HdfsSink.hdfs.path = hdfs://namenode/data/%Y/%m/%d/%H00/myrecord collector.sinks.HdfsSink.hdfs.rollSize = 0 collector.sinks.HdfsSink.hdfs.rollInterval = 1200 collector.sinks.HdfsSink.hdfs.rollCount = 0 collector.sinks.HdfsSink.hdfs.callTimeout = 60000 collector.sinks.HdfsSink.hdfs.batchSize = 10000 # ObjectRecordKafkaChannel channel config collector.channels.KafkaChannel.type = memory collector.channels.KafkaChannel.capacity = 1500000 collector.channels.KafkaChannel.transactionCapacity = 50000 collector.channels.KafkaChannel.write-timeout = 60 collector.channels.KafkaChannel.keep-alive = 30 # ObjectRecordKafkaSink sink config collector.sinks.KafkaSink.type = org.apache.flume.sink.kafka.KafkaSink collector.sinks.KafkaSink.channel = KafkaChannel collector.sinks.KafkaSink.zk.connect = zk-1.com<http://zk-1.com>,zk-2.com<http://zk-2.com>,zk-3.com<http://zk-3.com> collector.sinks.KafkaSink.metadata.broker.list = kafka-1.com<http://kafka-1.com>:9092,kafka-2.com<http://kafka-2.com>:9092,kafka-3.com<http://kafka-3.com>:9092 collector.sinks.KafkaSink.topic = MyRecord collector.sinks.KafkaSink.batch.num.messages = 1000 collector.sinks.KafkaSink.producer.type = async collector.sinks.KafkaSink.request.required.acks = 0 collector.sinks.KafkaSink.serializer.class = kafka.serializer.DefaultEncoder collector.sinks.KafkaSink.key.serializer.class = kafka.serializer.StringEncoder collector.sinks.KafkaSink.partition.key=keyName -Kushal On Mar 17, 2015, at 2:31 PM, Hari Shreedharan <[email protected]<mailto:[email protected]>> wrote: I have seen one other report recently with the optional mapping issues. Can you also send your configuration? I’d like to investigate this and figure out what the issue is. Thanks, Hari Shreedharan On Mar 17, 2015, at 2:24 PM, Mangtani, Kushal <[email protected]<mailto:[email protected]>> wrote: Hello, We are using Flume in our prod env to ingest data. A while back, we decided to extend the functionality and added kafka for real time monitoring. So, the Flume Source forks off and deposits the data into two separate channels ,one if HDFS(required mapping) and other is Kafka(optional mapping). We have made the KafkaChannels as optional selector mapping<http://flume.apache.org/releases/content/1.4.0/FlumeUserGuide.html#fan-out-flow> so that any issue with Kafka should not block the HDFS pipeline. However, I have noticed this never happens. Any issue with Kafka cluster eventually also brings down the HDFS ingestion. So, my question is that either Optional Channel Mapping in flume src code does not works correctly OR kafka-sink/kafka cluster I am using is outdated ? Any inputs will be appreciated. Env: * Ubuntu 12.04 * CDH 5 flume 1.4 * Kafka Src Download - 2.9.1-0.8.1.1 * Using Custom Flume-Kafka Sink https://github.com/baniuyao/flume-ng-kafka-sink - Kushal
