Hi, Jeff, Thanks, I will try out, but is there any reason why one sink is so slow, or is it really a performance issue with any functional benefits?
Thanks, Pal On Mon, Oct 20, 2014 at 3:22 PM, Jeff Lord <[email protected]> wrote: > Pal, > > You can add more sinks to your config. > Don't put them in a sink group just have multiple sinks pulling from the > same channel. This should increase your throughput. > > Best, > > Jeff > > On Mon, Oct 20, 2014 at 3:49 AM, Pal Konyves <[email protected]> wrote: >> >> Hi there, >> >> We would like to write lots of logs to HDFS via Flume, you can imagine >> it as a stress test, or max throughput test. I attached our flume >> config below. The problem is, HDFS writes are freakin' slow. Flume >> writes to HDFS with ~5-10Mbit/s (for us, it means a couple thousand >> events per sec)on a 1Gbit network. >> >> 'hadoop dfs -put' command is ok, I checked the bandwidth usage with >> iftop. The bandwidth usage was ~200Mbits/sec, a 1Gig file is uploded >> within seconds, so HDFS itself should not be slow. >> >> Flume is capable of receiving the messages with around the same speed, >> so the Avro source we use is not the issue, I can see the memory >> channel filling up. >> >> On the other hand, in iftop, I can see, that while Flume receives the >> events fast, it only writes to the datanode with 5-10Mbit/sec, very >> very slow. Why is that? I try to use huge batch-sizes for HDFS sink: >> 10 000 - 100 000 events, because supposedly batch write is always >> faster, but the sink only writes ~ 2 000 - 3 000 events per sec, >> according to the JMX console. Smaller batch sizes (1 000) are not >> faster. >> >> I could not find the magic configuration that makes HDFS sink write >> faster, so I think there is generally something wrong in Flume. I even >> tried the PseudoTx channel to disable transacions, without success of >> improving the write performance. >> >> Setup: >> I have setup of 5 physical machines, strong ones, Dell T5600, 6 core >> xeon, Gigabit networking: >> - Flume Avro client generating log messages >> - Flume Agent >> - HDFS namenode >> - HDFS datanode1 >> - HDFS datanode2 >> >> +++++++++++++++++++++++++++++ >> Configuration: http://pastebin.com/53DGd3wm >> >> agent.sources = r1 >> agent.channels = memoryChannel1 >> agent.sinks = s1 >> >> >> ############### >> # source >> ############## >> >> # For each one of the sources, the type is defined >> agent.sources.r1.type = avro >> >> agent.sources.r1.threads = 10 >> #agent.sources.r1.compression-type=deflate >> >> # The channel can be defined as follows. >> agent.sources.r1.channels = memoryChannel1 >> >> # thrift specific configuration >> agent.sources.r1.bind = 0.0.0.0 >> agent.sources.r1.port = 50414 >> >> ############# >> # sink >> ############ >> # Each sink's type must be defined >> agent.sinks.s1.type = hdfs >> agent.sinks.s1.hdfs.path = hdfs://namenode/flume/events/%D/%H/%M >> agent.sinks.s1.hdfs.filePrefix = flume-events >> agent.sinks.s1.hdfs.fileSuffix = .log >> agent.sinks.s1.hdfs.fileType = DataStream >> >> # round by every 15 minutes >> agent.sinks.s1.hdfs.round = true >> agent.sinks.s1.hdfs.roundValue = 15 >> agent.sinks.s1.hdfs.roundUnit = minute >> agent.sinks.s1.hdfs.timeZone = UTC >> >> # never roll based on file size >> agent.sinks.s1.hdfs.rollSize = 0 >> # never roll based on count >> agent.sinks.s1.hdfs.rollCount = 0 >> # roll on every 1 minute >> agent.sinks.s1.hdfs.rollInterval = 60 >> agent.sinks.s1.hdfs.threadsPoolSize = 10 >> agent.sinks.s1.hdfs.rollTimerPoolSize = 2 >> >> # events written to file before it is flushded to HDFS >> agent.sinks.s1.hdfs.batchSize = 20000 >> #Specify the channel the sink should use >> agent.sinks.s1.channel = memoryChannel1 >> >> ############## >> # channel >> ############# >> >> # Each channel's type is defined. >> # agent.channels.memoryChannel1.type = >> org.apache.flume.channel.PseudoTxnMemoryChannel >> agent.channels.memoryChannel1.type = memory >> >> >> # Other config values specific to each type of channel(sink or source) >> # can be defined as well >> # In this case, it specifies the capacity of the memory channel >> agent.channels.memoryChannel1.capacity = 500000 >> agent.channels.memoryChannel1.transactionCapacity = 200000 >> agent.channels.memoryChannel1.byteCapacity = 1000000000 > >
