Same questions: Can you take and share 8-10 thread dumps while the sink is taking events "slowly"?
Can you share your machine configuration? On Dec 17, 2013 7:57 AM, "David Sinclair" <[email protected]> wrote: > Hi, > > I am using a File Channel connected to an AMQP Source and an HDFS Sink. > Events are coming in at a rate around 1500 msg/sec, and the AMQP source is > batching them 1000 a shot. The writing to the file channel seems to be > keeping up well with this rate. However, when the HDFS Sink, also batch > size 1000, is trying to read out of the channel it cannot even come close > to keeping up that rate. I haven't set up the data directory and checkpoint > directory to write to different disks yet, but I was hoping I was doing > something obviously wrong that would account for this behaviour. I have > also played with the batch size of the HDFS sink and that doesn't seem to > make much of a difference. I also realize that I can add additional sinks, > but was more curious if people of experienced the same behavior I am seeing. > > Config is below > > # Sources, channels, and sinks are defined per > # agent name, in this case 'collector'. > collector.sources = amqpSource1 > collector.channels = file1 > collector.sinks = hdfsSink1 > > #### AMQP Source > collector.sources.amqpSource1.type = **.AmqpSource > collector.sources.amqpSource1.host = <host> > collector.sources.amqpSource1.virtualHost = <vhost> > collector.sources.amqpSource1.requestedHeartbeat = 15 > collector.sources.amqpSource1.exchangeName = a7-events > collector.sources.amqpSource1.exchangeType = topic > collector.sources.amqpSource1.bindings = *.*.*.*.* > collector.sources.amqpSource1.queueName = flumeEvents > collector.sources.amqpSource1.autoAck = true > collector.sources.amqpSource1.batchSize = 1000 > # The queue will go away when this source is shutdown > collector.sources.amqpSource1.autoDeleteQueue=true > collector.sources.amqpSource1.interceptors = i1 i2 > collector.sources.amqpSource1.interceptors.i1.type = > ***.HostNameInterceptor$Builder > collector.sources.amqpSource1.interceptors.i2.type = host > collector.sources.amqpSource1.interceptors.i2.hostHeader = flumeHostname > collector.sources.amqpSource1.interceptors.i2.useIP = false > > #### File Channel > collector.channels.file1.type = file > collector.channels.file1.maxFileSize = 10737418240 > collector.channels.file1.capacity = 100000000 > collector.channels.file1.transactionCapacity = 5000 > collector.channels.file1.checkpointDir = > /data/1/flume/file-channel1/checkpoint > collector.channels.file1.dataDirs = /data/1/flume/file-channel1/data > > ######################################################################## > ### HDFS Sinks > collector.sinks.hdfsSink1.type = hdfs > collector.sinks.hdfsSink1.hdfs.fileType = SequenceFile > collector.sinks.hdfsSink1.hdfs.filePrefix = > data_%{flumeHostname}_%{sourceHostname}_1_ > collector.sinks.hdfsSink1.hdfs.fileSuffix = .seq > collector.sinks.hdfsSink1.hdfs.rollInterval = 300 > collector.sinks.hdfsSink1.hdfs.idleTimeout = 120 > collector.sinks.hdfsSink1.hdfs.rollSize = 0 > collector.sinks.hdfsSink1.hdfs.rollCount = 0 > collector.sinks.hdfsSink1.hdfs.batchSize = 1000 > collector.sinks.hdfsSink1.hdfs.path = /user/flume/udb/test/kpi/%Y/%m/%d/%H > collector.sinks.hdfsSink1.hdfs.writeFormat = > **.KpiSequenceFileSerializer$Builder > collector.sinks.hdfsSink1.hdfs.codeC = gzip > ######################################################################## > > collector.sources.amqpSource1.channels = file1 > collector.sinks.hdfsSink1.channel = file1 > collector.sinks.unuHdfsSinkWc2.channel = file1 > > I don't know if this is expected performance of the file channel, or not. > I should note that the average message size is around 4K. > > Any help is appreciated. > > dave >
