flume.conf (Cleaned up for security and making it less verbose by removing 
additional sources,channel,sinks)

# Name the components on this agent
collector.sources = Source1
collector.channels = HdfsChannel KafkaChannel

collector.sinks = HdfsSink KafkaSink

# Describe/configure the source AvroSource
collector.sources.CustomSource.type = com.flume.CustomSource
collector.sources.CustomSource.channels = HdfsChannel KafkaChannel
collector.sources.CustomSource.bind = 0.0.0.0
collector.sources.CustomSource.port = 9898
collector.sources.CustomSource.schemaFolder = /usr/lib/flume-ng/schemas
collector.sources.CustomSource.selector.type = multiplexing
collector.sources.CustomSource.selector.header = recordType
# required channel mapings
collector.sources.CustomSource.selector.mapping.MyRecord = HdfsChannel
# optional channel mapings
collector.sources.CustomSource.selector.optional.MyRecord  = KafkaChannel

# HdfsChannel channel config
collector.channels.HdfsChannel.type = file
collector.channels.HdfsChannel.useDualCheckpoints = true
collector.channels.HdfsChannel.checkpointDir = 
/mnt/persistent/0/flume-ng-data/Hdfsdata/checkpoint
collector.channels.HdfsChannel.backupCheckpointDir = 
/mnt/persistent/0/flume-ng-data/Hdfsdata/backup-checkpoint
collector.channels.HdfsChannel.dataDirs = 
/mnt/persistent/0/flume-ng-data/Hdfsdata/logs
collector.channels.HdfsChannel.capacity = 1000000
collector.channels.HdfsChannel.transactionCapacity = 50000
collector.channels.HdfsChannel.write-timeout = 60
collector.channels.HdfsChannel.keep-alive = 30

# HdfsSink sink config
collector.sinks.HdfsSink.type = hdfs
collector.sinks.HdfsSink.channel = HdfsChannel
collector.sinks.HdfsSink.hdfs.fileType = DataStream
collector.sinks.HdfsSink.serializer = CUstomSerializer
collector.sinks.HdfsSink.serializer.schemaFolder = /usr/lib/flume-ng/schemas
collector.sinks.HdfsSink.serializer.syncIntervalBytes = 4096000
collector.sinks.HdfsSink.serializer.compressionCodec = snappy
collector.sinks.HdfsSink.hdfs.path = hdfs://namenode/data/%Y/%m/%d/%H00/myrecord
collector.sinks.HdfsSink.hdfs.rollSize = 0
collector.sinks.HdfsSink.hdfs.rollInterval = 1200
collector.sinks.HdfsSink.hdfs.rollCount = 0
collector.sinks.HdfsSink.hdfs.callTimeout = 60000
collector.sinks.HdfsSink.hdfs.batchSize = 10000

# ObjectRecordKafkaChannel channel config
collector.channels.KafkaChannel.type = memory
collector.channels.KafkaChannel.capacity = 1500000
collector.channels.KafkaChannel.transactionCapacity = 50000
collector.channels.KafkaChannel.write-timeout = 60
collector.channels.KafkaChannel.keep-alive = 30

# ObjectRecordKafkaSink sink config
collector.sinks.KafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
collector.sinks.KafkaSink.channel = KafkaChannel
collector.sinks.KafkaSink.zk.connect = 
zk-1.com<http://zk-1.com>,zk-2.com<http://zk-2.com>,zk-3.com<http://zk-3.com>
collector.sinks.KafkaSink.metadata.broker.list = 
kafka-1.com<http://kafka-1.com>:9092,kafka-2.com<http://kafka-2.com>:9092,kafka-3.com<http://kafka-3.com>:9092
collector.sinks.KafkaSink.topic = MyRecord
collector.sinks.KafkaSink.batch.num.messages = 1000
collector.sinks.KafkaSink.producer.type = async
collector.sinks.KafkaSink.request.required.acks = 0
collector.sinks.KafkaSink.serializer.class = kafka.serializer.DefaultEncoder
collector.sinks.KafkaSink.key.serializer.class = kafka.serializer.StringEncoder
collector.sinks.KafkaSink.partition.key=keyName


-Kushal

On Mar 17, 2015, at 2:31 PM, Hari Shreedharan 
<[email protected]<mailto:[email protected]>> wrote:

I have seen one other report recently with the optional mapping issues. Can you 
also send your configuration? I’d like to investigate this and figure out what 
the issue is.

Thanks,
Hari Shreedharan




On Mar 17, 2015, at 2:24 PM, Mangtani, Kushal 
<[email protected]<mailto:[email protected]>> wrote:

Hello,

We are using Flume in our prod env to ingest data. A while back, we decided to 
extend the functionality and added kafka for real time monitoring.
So, the Flume Source forks off and deposits the data into two separate channels 
,one if HDFS(required mapping) and other is Kafka(optional mapping). We have 
made the KafkaChannels as optional selector 
mapping<http://flume.apache.org/releases/content/1.4.0/FlumeUserGuide.html#fan-out-flow>
 so that any issue with Kafka should not block the HDFS pipeline.
However, I have noticed this never happens. Any issue with Kafka cluster 
eventually also brings down the HDFS ingestion. So, my question is that either 
Optional Channel Mapping in flume src code does not works correctly OR 
kafka-sink/kafka cluster  I am using is outdated ? Any inputs will be 
appreciated.

Env:

  *   Ubuntu 12.04
  *   CDH 5 flume 1.4
  *   Kafka Src Download - 2.9.1-0.8.1.1
  *   Using Custom Flume-Kafka Sink 
https://github.com/baniuyao/flume-ng-kafka-sink

- Kushal


Reply via email to