Hi,
I have noticed a couple of issues in my flume setup. Below I have described my
setup,issues, flume.conf
Setup:-
I'm using Flume-Ng 1.4 cdh4.4 Tarball for collecting aggregated logs.
I am running a 2 tier(agent,collector) Flume Configuration with custom plugins.
There are approximately 20 agents (receiving data) and 6 collector flume
(writing to HDFS) machines all running independently.
Issues :-
The agent seems to be running fine.However; I notice a couple of issues in the
collector side (the collector flume.conf is included in the end of email):
* Issue 1 :- Assume flume is writing a file in a data node.Now; if that
data node crashes for some reason. Flume does not recovers from this situation.
Ideally; flume should skip writing to that file and continue its
processing.However; we see it tries to reach that file.If it can't; it keeps on
trying infinitely and stops doing any other processing. Either I'm not doing
something right OR there is a bug in HDFS Sink
STDOUT of the logs:
10 Jun 2014 20:23:40,878 WARN [SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.sink.hdfs.BucketWriter.append:477) - Caught IOException
writing to HDFSWriter (70000 millis timeout while waiting for channel to be
ready for read. ch : java.nio.channels.SocketChannel[connected
local=/10.64.4.22:58980 remote=/10.64.6.134:50010]). Closing file
(hdfs://namenode/data/2014/06/10/1900/stream/c-6-record1.1402429221442.tmp) and
rethrowing exception.
10 Jun 2014 20:23:40,904 WARN [SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.sink.hdfs.BucketWriter.append:483) - Caught IOException
while closing file
(hdfs://namenode/data/2014/06/10/1900/rumnonhttp/c-6-rum24-nonhttprecord.1402429221442.tmp).
Exception follows.
java.net.SocketTimeoutException: 70000 millis timeout while waiting for channel
to be ready for read. ch : java.nio.channels.SocketChannel[connected
local=/10.64.4.22:58980 remote=/10.64.6.134:50010]
at
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:165)
at
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:156)
at
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:129)
at
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:117)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at
org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed(HdfsProtoUtil.java:169)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.transfer(DFSOutputStream.java:954)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:922)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1023)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:821)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:463)
10 Jun 2014 20:23:40,904 WARN [SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.sink.hdfs.HDFSEventSink.process:438) - HDFS IO error
java.net.SocketTimeoutException: 70000 millis timeout while waiting for channel
to be ready for read. ch : java.nio.channels.SocketChannel[connected
local=/10.64.4.22:58980 remote=/10.64.6.134:50010]
at
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:165)
at
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:156)
at
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:129)
at
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:117)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at
org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed(HdfsProtoUtil.java:169)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.transfer(DFSOutputStream.java:954)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:922)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1023)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:821)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:463)
10 Jun 2014 20:23:41,617 INFO [New I/O server boss #1 ([id: 0x5201a55f,
/0.0.0.0:53000])]
(org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171) -
[id: 0x00b4bdbe, /10.75.201.32:42877 => /10.64.4.22:53000] OPEN
10 Jun 2014 20:23:41,617 INFO [New I/O worker #8]
(org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171) -
[id: 0x00b4bdbe, /10.75.201.32:42877 => /10.64.4.22:53000] BOUND:
/10.64.4.22:53000
10 Jun 2014 20:23:41,617 INFO [New I/O worker #8]
(org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171) -
[id: 0x00b4bdbe, /10.75.201.32:42877 => /10.64.4.22:53000] CONNECTED:
/10.75.201.32:42877
10 Jun 2014 20:23:43,686 DEBUG [New I/O worker #8]
(com.viasat.flume.sources.RUMFilterAvroSource.appendBatch:359) - Avro source
AvroSource: Received avro event batch of 10000 events.
10 Jun 2014 20:23:44,646 ERROR [New I/O worker #7]
(com.viasat.flume.sources.RUMFilterAvroSource.appendBatch:401) - Avro source
AvroSource: Unable to process event batch. Exception follows.
org.apache.flume.ChannelException: Unable to put batch on required channel:
org.apache.flume.channel.MemoryChannel{name: NonHttpHdfsChannel}
at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
at
com.viasat.flume.sources.RUMFilterAvroSource.appendBatch(RUMFilterAvroSource.java:398)
.....
* Issue 2 :- I am using a File Channel in my collector.conf . I noticed
the replay of logs takes a lot of time; around 24 hrs to replay 12 GB of data.
I am using Amazon EBS IOPS drive for file channel storage and also dual
checkpoints in file channel conf. On parsing the flume.logs; I noticed that
there is a Bad Checkpoint Exception.
So; putting all the pieces together; Flume founded a bad checkpoint and it
tried to replay all logs worth 12 GB. What makes the time to replay logs (12
GB) at around 24 hours ?
P.S : Each event/record is avg 2KB .
Flume Collector Configuration :-
# Name the components on this agent
agent.sources = r1
agent.channels = c1 c2 c3 c4 c5
agent.sinks = k1 k2 k3 k4 k5
# Describe/configure the source r1
agent.sources.r1.type = CustomSource
agent.sources.r1.channels = c1 c2 c3 c4 c5
agent.sources.r1.bind = 0.0.0.0
agent.sources.r1.port = 53000
agent.sources.r1.schemaFolder = /usr/lib/flume-ng/schema
agent.sources.r1.selector.type = multiplexing
agent.sources.r1.selector.header = rectype
agent.sources.r1.selector.mapping.Record-1 = c1
agent.sources.r1.selector.mapping.Record-2 = c2
agent.sources.r1.selector.mapping.Record-3 = c3
agent.sources.r1.selector.mapping.Record-4 = c4
agent.sources.r1.selector.mapping.Record-5 = c5
# c1 channel config
agent.channels.c1.type = file
agent.channels.c1.useDualCheckpoints = true
agent.channels.c1.checkpointDir =
/usr/lib/flume-ng/datastore/collector/record-1-channel/checkpoint
agent.channels.c1.backUpCheckpointDir =
/usr/lib/flume-ng/datastore/collector/record-1-channel/backUpCheckpoint
agent.channels.c1.dataDirs =
/usr/lib/flume-ng/datastore/collector/record-1-channel/logs
agent.channels.c1.capacity = 30000
agent.channels.c1.transactionCapacity = 3000
agent.channels.c1.write-timeout = 30
agent.channels.c1.keep-alive = 30
#c2 channel config
agent.channels.c2.type = file
agent.channels.c2.useDualCheckpoints = true
agent.channels.c2.checkpointDir =
/usr/lib/flume-ng/datastore/collector/record-2-channel/checkpoint
agent.channels.c2.backUpCheckpointDir =
/usr/lib/flume-ng/datastore/collector/record-2-channel/backUpCheckpoint
agent.channels.c2.dataDirs =
/usr/lib/flume-ng/datastore/collector/record-2-channel/logs
agent.channels.c2.capacity = 30000
agent.channels.c2.transactionCapacity = 3000
agent.channels.c2.write-timeout = 30
agent.channels.c2.keep-alive = 30
# c3 channel config
agent.channels.c3.type = file
agent.channels.c3.useDualCheckpoints = true
agent.channels.c3.checkpointDir =
/usr/lib/flume-ng/datastore/collector/record-3-channel/checkpoint
agent.channels.c3.backUpCheckpointDir =
/usr/lib/flume-ng/datastore/collector/record-3-channel/backUpCheckpoint
agent.channels.c3.dataDirs =
/usr/lib/flume-ng/datastore/collector/record-3-channel/logs
agent.channels.c3.capacity = 30000
agent.channels.c3.transactionCapacity = 3000
agent.channels.c3.write-timeout = 30
agent.channels.c3.keep-alive = 30
#c4 channel config
agent.channels.c4.type = file
agent.channels.c4.useDualCheckpoints = true
agent.channels.c4.checkpointDir =
/usr/lib/flume-ng/datastore/collector/record-4-channel/checkpoint
agent.channels.c4.backUpCheckpointDir =
/usr/lib/flume-ng/datastore/collector/record-4-channel/backUpCheckpointt
agent.channels.c4.dataDirs =
/usr/lib/flume-ng/datastore/collector/record-4-channel/logs
agent.channels.c4.capacity = 30000
agent.channels.c4.transactionCapacity = 3000
agent.channels.c4.write-timeout = 30
agent.channels.c4.keep-alive = 30
#c5 channel config
agent.channels.c5.type = file
agent.channels.c5.useDualCheckpoints = true
agent.channels.c5.checkpointDir =
/usr/lib/flume-ng/datastore/collector/record-5-channel/checkpoint
agent.channels.c5.backUpCheckpointDir =
/usr/lib/flume-ng/datastore/collector/record-5-channel/backUpCheckpointt
agent.channels.c5.dataDirs =
/usr/lib/flume-ng/datastore/collector/record-5-channel/logs
agent.channels.c5.capacity = 30000
agent.channels.c5.transactionCapacity = 3000
agent.channels.c5.write-timeout = 30
agent.channels.c5.keep-alive = 30
#k1 sink config
agent.sinks.k1.type = hdfs
agent.sinks.k1.channel = c1
agent.sinks.k1.hdfs.fileType = DataStream
agent.sinks.k1.serializer = CustomSerializer$Builder
agent.sinks.k1.serializer.schemaFile = /usr/lib/flume-ng/schema/schema
agent.sinks.k1.serializer.schemaVersion = 24
agent.sinks.k1.serializer.syncIntervalBytes = 4096000
#agent.sinks.k1.serializer = avro
agent.sinks.k1.serializer.compressionCodec = snappy
agent.sinks.k1.hdfs.path = hdfs://namenode/data/%Y/%m/%d/%H00/record-1
agent.sinks.k1.hdfs.filePrefix = rec-1
agent.sinks.k1.hdfs.rollSize = 0
agent.sinks.k1.hdfs.rollInterval = 1200
agent.sinks.k1.hdfs.rollCount = 0
agent.sinks.k1.hdfs.callTimeout = 30000
agent.sinks.k1.hdfs.batchSize = 1000
#k2 sink config
agent.sinks.k2.type = hdfs
agent.sinks.k2.channel = c2
agent.sinks.k2.hdfs.fileType = DataStream
agent.sinks.k2.serializer = CustomSerializer$Builder
agent.sinks.k2.serializer.schemaFile = /usr/lib/flume-ng/schema/schema
agent.sinks.k2.serializer.schemaVersion = 24
agent.sinks.k2.serializer.syncIntervalBytes = 4096000
#agent.sinks.k2.serializer = avro
agent.sinks.k2.serializer.compressionCodec = snappy
agent.sinks.k2.hdfs.path = hdfs://namenode/data/%Y/%m/%d/%H00/record-2
agent.sinks.k2.hdfs.filePrefix = rec-2
agent.sinks.k2.hdfs.rollSize = 0
agent.sinks.k2.hdfs.rollInterval = 1200
agent.sinks.k2.hdfs.rollCount = 0
agent.sinks.k2.hdfs.callTimeout = 30000
agent.sinks.k2.hdfs.batchSize = 1000
#k3 sink config
agent.sinks.k3.type = hdfs
agent.sinks.k3.channel = c3
agent.sinks.k3.hdfs.fileType = DataStream
agent.sinks.k3.serializer = CustomSerializer$Builder
agent.sinks.k3.serializer.schemaFile = /usr/lib/flume-ng/schema/schema
agent.sinks.k3.serializer.schemaVersion = 24
agent.sinks.k3.serializer.syncIntervalBytes = 4096000
#agent.sinks.k3.serializer = avro
agent.sinks.k3.serializer.compressionCodec = snappy
agent.sinks.k3.hdfs.path = hdfs://namenode/data/%Y/%m/%d/%H00/record-3
agent.sinks.k3.hdfs.filePrefix = rec-3
agent.sinks.k3.hdfs.rollSize = 0
agent.sinks.k3.hdfs.rollInterval = 1200
agent.sinks.k3.hdfs.rollCount = 0
agent.sinks.k3.hdfs.callTimeout = 30000
agent.sinks.k3.hdfs.batchSize = 1000
#k4 sink config
agent.sinks.k4.type = hdfs
agent.sinks.k4.channel = c4
agent.sinks.k4.hdfs.fileType = DataStream
agent.sinks.k4.serializer = CustomSerializer$Builder
agent.sinks.k4.serializer.schemaFile = /usr/lib/flume-ng/schema/schema
agent.sinks.k4.serializer.schemaVersion = 24
agent.sinks.k4.serializer.syncIntervalBytes = 4096000
#agent.sinks.k4.serializer = avro
agent.sinks.k4.serializer.compressionCodec = snappy
agent.sinks.k4.hdfs.path = hdfs://namenode/data/%Y/%m/%d/%H00/record-4
agent.sinks.k4.hdfs.filePrefix = rec-4
agent.sinks.k4.hdfs.rollSize = 0
agent.sinks.k4.hdfs.rollInterval = 1200
agent.sinks.k4.hdfs.rollCount = 0
agent.sinks.k4.hdfs.callTimeout = 30000
agent.sinks.k4.hdfs.batchSize = 1000
#k5 sink config
agent.sinks.k5.type = hdfs
agent.sinks.k5.channel = c5
agent.sinks.k5.hdfs.fileType = DataStream
agent.sinks.k5.serializer = CustomSerializer$Builder
agent.sinks.k5.serializer.schemaFile = /usr/lib/flume-ng/schema/schema
agent.sinks.k5.serializer.schemaVersion = 24
agent.sinks.k5.serializer.syncIntervalBytes = 4096000
#agent.sinks.k5.serializer = avro
agent.sinks.k5.serializer.compressionCodec = snappy
agent.sinks.k5.hdfs.path = hdfs://namenode/data/%Y/%m/%d/%H00/record-5
agent.sinks.k5.hdfs.filePrefix = rec-5
agent.sinks.k5.hdfs.rollSize = 0
agent.sinks.k5.hdfs.rollInterval = 1200
agent.sinks.k5.hdfs.rollCount = 0
agent.sinks.k5.hdfs.callTimeout = 30000
agent.sinks.k5.hdfs.batchSize = 1000
Any Inputs/Suggestions ?
Thanks
-Kushal Mangtani