Mike Percy contributed a most excellent blog post on this topic. Have you had a chance to read over this? https://blogs.apache.org/flume/entry/flume_performance_tuning_part_1
"* Tuning the batch size trades throughput vs. latency and duplication under failure. With a small batch size, throughput decreases, but the risk of event duplication is reduced if a failure were to occur. With a large batch size, you get much better throughput, but increased latency, and in the case of a transaction failure, the number of possible duplicates increases." * On Wed, Apr 24, 2013 at 11:04 PM, David Quigley <[email protected]>wrote: > Thanks all, > > Added a dedicated channel for hdfs and hbase and everything all events are > making it into their sinks now. > > What is the best tuning strategy for getting events from an exec source -> > avro sink -> avro source -> hbase sink with the least amount of latency? > Will batch size and transaction size have any effect on this latency? > > Thanks again > > > On Mon, Apr 22, 2013 at 10:58 AM, Israel Ekpo <[email protected]> wrote: > >> David, >> >> In addition to what has already been said, if you take a look at your >> flume log files, you should be able to see exception messages that explain >> why this is happening. >> >> >> >> >> On 22 April 2013 11:11, David Quigley <[email protected]> wrote: >> >>> Hi, >>> >>> I am using flume to write events from webserver to both HDFS and HBase. >>> All events are being written to HDFS but only about half are making it into >>> HBase. Is there anything in my configurations which would be causing the >>> issue? I have both HDFS and HBase sink reading from the same File Channel. >>> Is it better to have one channel per sink? >>> >>> Thanks, >>> Dave >>> >>> >>> # flume config on web server >>> agent.sources = sourceLog >>> agent.sources.sourceLog.type = exec >>> agent.sources.sourceLog.command = tail -F /var/log/clickServer/clicks_out >>> agent.sources.sourceLog.batchSize = 100 >>> agent.sources.sourceLog.channels = fileChannel >>> >>> agent.sources.sourceLog.interceptors = itime ihost idatatype idataparent >>> agent.sources.sourceLog.interceptors.itime.type = timestamp >>> agent.sources.sourceLog.interceptors.ihost.type = host >>> agent.sources.sourceLog.interceptors.ihost.useIP = false >>> agent.sources.sourceLog.interceptors.ihost.hostHeader = host >>> agent.sources.sourceLog.interceptors.idatatype.type = static >>> agent.sources.sourceLog.interceptors.idatatype.key = data_type >>> agent.sources.sourceLog.interceptors.idatatype.value = clicks >>> agent.sources.sourceLog.interceptors.idataparent.type = static >>> agent.sources.sourceLog.interceptors.idataparent.key = data_parent >>> agent.sources.sourceLog.interceptors.idataparent.value = * >>> >>> agent.channels = fileChannel >>> agent.channels.fileChannel.type = file >>> agent.channels.fileChannel.transactionCapacity = 100 >>> agent.channels.fileChannel.checkpointDir = >>> /opt/flume/file-channel/checkpoint >>> agent.channels.fileChannel.dataDirs = /opt/flume/file-channel/data >>> >>> agent.sinks = AvroSink_main AvroSink_backup_1 AvroSink_backup_2 >>> AvroSink_backup_3 >>> agent.sinks.AvroSink_main.type = avro >>> agent.sinks.AvroSink_main.channel = fileChannel >>> agent.sinks.AvroSink_main.hostname = * >>> agent.sinks.AvroSink_main.port = 35873 >>> agent.sinks.AvroSink_main.batchSize = 100 >>> agent.sinks.AvroSink_backup_1.type = avro >>> agent.sinks.AvroSink_backup_1.channel = fileChannel >>> agent.sinks.AvroSink_backup_1.hostname = * >>> agent.sinks.AvroSink_backup_1.port = 35873 >>> agent.sinks.AvroSink_backup_1.batchSize = 100 >>> agent.sinks.AvroSink_backup_2.type = avro >>> agent.sinks.AvroSink_backup_2.channel = fileChannel >>> agent.sinks.AvroSink_backup_2.hostname = * >>> agent.sinks.AvroSink_backup_2.port = 35873 >>> agent.sinks.AvroSink_backup_2.batchSize = 100 >>> agent.sinks.AvroSink_backup_3.type = avro >>> agent.sinks.AvroSink_backup_3.channel = fileChannel >>> agent.sinks.AvroSink_backup_3.hostname = * >>> agent.sinks.AvroSink_backup_3.port = 35873 >>> agent.sinks.AvroSink_backup_3.batchSize = 100 >>> agent.sinkgroups = failover >>> agent.sinkgroups.failover.sinks = AvroSink_main AvroSink_backup_1 >>> AvroSink_backup_2 AvroSink_backup_3 >>> agent.sinkgroups.failover.processor.type = failover >>> agent.sinkgroups.failover.processor.priority.AvroSink_main = 10 >>> agent.sinkgroups.failover.processor.priority.AvroSink_backup_1 = 5 >>> agent.sinkgroups.failover.processor.priority.AvroSink_backup_2 = 3 >>> agent.sinkgroups.failover.processor.priority.AvroSink_backup_3 = 1 >>> agent.sinkgroups.failover.processor.maxpenalty = 10000 >>> >>> >>> >>> # flume config on hadoop cluster >>> >>> collector.sources=AvroIn >>> >>> collector.sources.AvroIn.type=avro >>> >>> collector.sources.AvroIn.bind=0.0.0.0 >>> >>> collector.sources.AvroIn.port=35873 >>> >>> collector.sources.AvroIn.channels=fileChannel >>> >>> >>> collector.channels=fileChannel >>> >>> collector.channels.fileChannel.type=FILE >>> >>> collector.channels.fileChannel.capacity=1000 >>> >>> >>> collector.channels.fileChannel.checkpointDir=~/.flume/file-channel/checkpoint_%{data_type} >>> >>> >>> collector.channels.fileChannel.dataDirs=~/.flume/file-channel/data_%{data_type} >>> >>> collector.sinks=hbaseSink hdfsSink >>> >>> collector.sinks.hbaseSink.type=org.apache.flume.sink.hbase.AsyncHBaseSink >>> >>> collector.sinks.hbaseSink.channel=fileChannel >>> >>> collector.sinks.hbaseSink.table=clicks >>> >>> collector.sinks.hbaseSink.columnFamily=data >>> >>> collector.sinks.hbaseSink.batchSize=100 >>> >>> >>> collector.sinks.hbaseSink.serializer=com.*.serializer.HBaseClickSerializer >>> >>> collector.sinks.hbaseSink.serializer.incrementColumn=icol >>> >>> >>> collector.sinks.hdfsSink.type=hdfs >>> >>> collector.sinks.hdfsSink.channel=fileChannel >>> >>> >>> collector.sinks.hdfsSink.hdfs.path=/data/%{data_parent}/%{data_type}/month=%Y-%m/day=%d >>> >>> >>> collector.sinks.hdfsSink.hdfs.filePrefix=%{data_parent}_%{data_type}_%Y-%m-%d_%{host} >>> >>> collector.sinks.hdfsSink.hdfs.timeZone=America/Los_Angeles >>> >>> collector.sinks.hdfsSink.hdfs.fileType=DataStream >>> >>> collector.sinks.hdfsSink.hdfs.writeFormat=Text >>> >>> collector.sinks.hdfsSink.hdfs.rollSize=67100000 >>> >>> collector.sinks.hdfsSink.hdfs.rollCount=0 >>> >>> collector.sinks.hdfsSink.hdfs.rollInterval=3600 >>> >>> >> >
