Hi there, We would like to write lots of logs to HDFS via Flume, you can imagine it as a stress test, or max throughput test. I attached our flume config below. The problem is, HDFS writes are freakin' slow. Flume writes to HDFS with ~5-10Mbit/s (for us, it means a couple thousand events per sec)on a 1Gbit network.
'hadoop dfs -put' command is ok, I checked the bandwidth usage with iftop. The bandwidth usage was ~200Mbits/sec, a 1Gig file is uploded within seconds, so HDFS itself should not be slow. Flume is capable of receiving the messages with around the same speed, so the Avro source we use is not the issue, I can see the memory channel filling up. On the other hand, in iftop, I can see, that while Flume receives the events fast, it only writes to the datanode with 5-10Mbit/sec, very very slow. Why is that? I try to use huge batch-sizes for HDFS sink: 10 000 - 100 000 events, because supposedly batch write is always faster, but the sink only writes ~ 2 000 - 3 000 events per sec, according to the JMX console. Smaller batch sizes (1 000) are not faster. I could not find the magic configuration that makes HDFS sink write faster, so I think there is generally something wrong in Flume. I even tried the PseudoTx channel to disable transacions, without success of improving the write performance. Setup: I have setup of 5 physical machines, strong ones, Dell T5600, 6 core xeon, Gigabit networking: - Flume Avro client generating log messages - Flume Agent - HDFS namenode - HDFS datanode1 - HDFS datanode2 +++++++++++++++++++++++++++++ Configuration: http://pastebin.com/53DGd3wm agent.sources = r1 agent.channels = memoryChannel1 agent.sinks = s1 ############### # source ############## # For each one of the sources, the type is defined agent.sources.r1.type = avro agent.sources.r1.threads = 10 #agent.sources.r1.compression-type=deflate # The channel can be defined as follows. agent.sources.r1.channels = memoryChannel1 # thrift specific configuration agent.sources.r1.bind = 0.0.0.0 agent.sources.r1.port = 50414 ############# # sink ############ # Each sink's type must be defined agent.sinks.s1.type = hdfs agent.sinks.s1.hdfs.path = hdfs://namenode/flume/events/%D/%H/%M agent.sinks.s1.hdfs.filePrefix = flume-events agent.sinks.s1.hdfs.fileSuffix = .log agent.sinks.s1.hdfs.fileType = DataStream # round by every 15 minutes agent.sinks.s1.hdfs.round = true agent.sinks.s1.hdfs.roundValue = 15 agent.sinks.s1.hdfs.roundUnit = minute agent.sinks.s1.hdfs.timeZone = UTC # never roll based on file size agent.sinks.s1.hdfs.rollSize = 0 # never roll based on count agent.sinks.s1.hdfs.rollCount = 0 # roll on every 1 minute agent.sinks.s1.hdfs.rollInterval = 60 agent.sinks.s1.hdfs.threadsPoolSize = 10 agent.sinks.s1.hdfs.rollTimerPoolSize = 2 # events written to file before it is flushded to HDFS agent.sinks.s1.hdfs.batchSize = 20000 #Specify the channel the sink should use agent.sinks.s1.channel = memoryChannel1 ############## # channel ############# # Each channel's type is defined. # agent.channels.memoryChannel1.type = org.apache.flume.channel.PseudoTxnMemoryChannel agent.channels.memoryChannel1.type = memory # Other config values specific to each type of channel(sink or source) # can be defined as well # In this case, it specifies the capacity of the memory channel agent.channels.memoryChannel1.capacity = 500000 agent.channels.memoryChannel1.transactionCapacity = 200000 agent.channels.memoryChannel1.byteCapacity = 1000000000
