Title: Samsung Enterprise Portal mySingle

Hi there,

I'm testing flume with thrift source, file channel, and HDFS sink.

Also there is a flume client which sends events using thrift, and the size of each event is up to 30 Mbytes.

It works fine for a short period, but after a few minutes, following error occurs at org.apache.flume.channel.ChannelProcessor.

8:15:36.450 PM ERROR org.apache.flume.channel.ChannelProcessor
Error while writing to required channel: FileChannel c1 { dataDirs: [/data/2/flumechannel/data] }
java.lang.OutOfMemoryError: Direct buffer memory
	at java.nio.Bits.reserveMemory(Bits.java:658)
	at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
	at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
	at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
	at sun.nio.ch.IOUtil.write(IOUtil.java:58)
	at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205)
	at org.apache.flume.channel.file.LogFile$Writer.write(LogFile.java:313)
	at org.apache.flume.channel.file.LogFile$Writer.put(LogFile.java:267)
	at org.apache.flume.channel.file.Log.put(Log.java:633)
	at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doPut(FileChannel.java:469)
	at org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93)
	at org.apache.flume.channel.BasicChannelSemantics.put(BasicChannelSemantics.java:80)
	at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:265)
	at org.apache.flume.source.ThriftSource$ThriftSourceHandler.append(ThriftSource.java:253)
	at org.apache.flume.thrift.ThriftSourceProtocol$Processor$append.getResult(ThriftSourceProtocol.java:251)
	at org.apache.flume.thrift.ThriftSourceProtocol$Processor$append.getResult(ThriftSourceProtocol.java:236)
	at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
	at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
	at org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.invoke(AbstractNonblockingServer.java:478)
	at org.apache.thrift.server.Invocation.run(Invocation.java:18)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

I increased direct memory size upto 2G byte, but it didn't work.

Here's my flume configuration.

 

#source

tier1.sources.s1.type     = thrift
tier1.sources.s1.bind     = 0.0.0.0
tier1.sources.s1.port     = 30010
tier1.sources.s1.channels = c0 c1 memdefault
tier1.sources.s1.selector.type = multiplexing
tier1.sources.s1.selector.header = category
tier1.sources.s1.selector.mapping.Log4j = c0
tier1.sources.s1.selector.mapping.Data = "">tier1.sources.s1.selector.default = memDefault

 

#channel

tier1.channels.c1.type   = memory
tier1.channels.c1.checkpointDir=/data/2/flumechannel/checkpoint
tier1.channels.c1.dataDirs=/data/2/flumechannel/data
tier1.channels.c1.transactionCapacity = 1
tier1.channels.c1.maxFileSize = 500000000

 

#sink

tier1.sinks.k1.type         = hdfs
tier1.sinks.k1.channel      = c1
tier1.sinks.k1.hdfs.path = /user/soul
tier1.sinks.k1.hdfs.round = false
tier1.sinks.k1.hdfs.fileType = DataStream
tier1.sinks.k1.hdfs.rollCount = 1
tier1.sinks.k1.hdfs.batchSize = 1
tier1.sinks.k1.hdfs.retryInterval = 10
tier1.sinks.k1.hdfs.proxyUser = soul
tier1.sinks.k1.hdfs.maxOpenFiles = 10
tier1.sinks.k1.hdfs.idleTimeOut = 900

 

and java config. option: -Xmx2g -XX:MaxDirectMemorySize=2g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC

 

When I use memory channel instead of file channel, it works great.

I can't understand this phenomena.

The only clue that I have is that the exception always occurs after "org.apache.flume.channel.file.Log: Roll end"

 

 

Hope your comments.

Thank you.

 

 

Reply via email to