Thanks Brock. Madhu Munagala (214)679-2872
On Apr 8, 2013, at 2:15 PM, Brock Noland <[email protected]> wrote: > The channel is checkpointing. Reducing the channels capacity or making > whatever disk checkpointDir is on faster (dedicated disk, SSD, etc) would > speed checkpointing. > > > On Mon, Apr 8, 2013 at 2:12 PM, Madhu Gmail <[email protected]> wrote: >> Thanks Brock. >> >> But curious to know which other property setting causes this write- timeout >> issue. >> >> I have sink batch size as 20 and no batch size on the source side of the >> collector. This agent acts as a collector for other flume agents which >> sends the events to the collector. >> >> Madhu Munagala >> (214)679-2872 >> >> On Apr 8, 2013, at 1:42 PM, Brock Noland <[email protected]> wrote: >> >>> There is no harm in setting write-timeout to something like 30 seconds. In >>> fact it probably makes sense to increase the default to 30 seconds. >>> >>> >>> On Mon, Apr 8, 2013 at 1:38 PM, Madhu Gmail <[email protected]> >>> wrote: >>>> >>>>> Hello, >>>>> >>>>> >>>>> >>>>> I am getting below ERROR in flume agent(Acting as a collector) which is >>>>> receiving log events from another flume agent. >>>>> >>>>> >>>>> >>>>> I have also copied my flume-conf.properties at the end of this mail. >>>>> >>>>> Any idea how to tune write-timeout value. >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> 2013-04-05 13:17:33,197 ERROR org.apache.flume.SinkRunner: Unable to >>>>> deliver event. Exception follows. >>>>> >>>>> org.apache.flume.ChannelException: Failed to obtain lock for writing to >>>>> the log. Try increasing the log write timeout value. [channel=fc] >>>>> >>>>> at >>>>> org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doTake(FileChannel.java:434) >>>>> >>>>> at >>>>> org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) >>>>> >>>>> at >>>>> org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:91) >>>>> >>>>> at >>>>> org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:189) >>>>> >>>>> at >>>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) >>>>> >>>>> at >>>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) >>>>> >>>>> at java.lang.Thread.run(Thread.java:662) >>>>> >>>>> 2013-04-05 13:17:33,427 INFO >>>>> org.apache.flume.channel.file.EventQueueBackingStoreFile: Updating >>>>> checkpoint metadata: logWriteOrderID: 1365169979081, queueSize: 0, >>>>> queueHead: 362421 >>>>> >>>>> 2013-04-05 13:17:34,233 INFO org.apache.flume.channel.file.LogFileV3: >>>>> Updating log-14.meta currentPosition = 3818784, logWriteOrderID = >>>>> 1365169979081 >>>>> >>>>> 2013-04-05 13:17:34,294 INFO org.apache.flume.channel.file.Log: Updated >>>>> checkpoint for file: /opt/sponge/flume/file-channel/dataDirs/log-14 >>>>> position: 3818784 logWriteOrderID: 1365169979081 >>>>> >>>>> 2013-04-05 13:17:34,294 DEBUG org.apache.flume.channel.file.Log: Rolling >>>>> back 1365169950299 >>>>> >>>>> 2013-04-05 13:17:34,296 ERROR org.apache.flume.source.AvroSource: Avro >>>>> source S1: Unable to process event batch. Exception follows. >>>>> >>>>> org.apache.flume.ChannelException: Unable to put batch on required >>>>> channel: FileChannel fc { dataDirs: >>>>> [/opt/sponge/flume/file-channel/dataDirs] } >>>>> >>>>> at >>>>> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200) >>>>> >>>>> at >>>>> org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:237) >>>>> >>>>> at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown >>>>> Source) >>>>> >>>>> at >>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) >>>>> >>>>> at java.lang.reflect.Method.invoke(Method.java:597) >>>>> >>>>> at >>>>> org.apache.avro.ipc.specific.SpecificResponder.respond(SpecificResponder.java:88) >>>>> >>>>> at >>>>> org.apache.avro.ipc.Responder.respond(Responder.java:149) >>>>> >>>>> at >>>>> org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188) >>>>> >>>>> at >>>>> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80) >>>>> >>>>> at >>>>> org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173) >>>>> >>>>> at >>>>> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) >>>>> >>>>> at >>>>> org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:783) >>>>> >>>>> at >>>>> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:302) >>>>> >>>>> at >>>>> org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:321) >>>>> >>>>> at >>>>> org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:299) >>>>> >>>>> at >>>>> org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:216) >>>>> >>>>> at >>>>> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80) >>>>> >>>>> at >>>>> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) >>>>> >>>>> at >>>>> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) >>>>> >>>>> at >>>>> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274) >>>>> >>>>> at >>>>> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261) >>>>> >>>>> at >>>>> org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351) >>>>> >>>>> at >>>>> org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282) >>>>> >>>>> at >>>>> org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202) >>>>> >>>>> at >>>>> org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) >>>>> >>>>> at >>>>> org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:44) >>>>> >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) >>>>> >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) >>>>> >>>>> at java.lang.Thread.run(Thread.java:662) >>>>> >>>>> Caused by: org.apache.flume.ChannelException: Failed to obtain lock for >>>>> writing to the log. Try increasing the log write timeout value. >>>>> [channel=fc] >>>>> >>>>> at >>>>> org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doPut(FileChannel.java:400) >>>>> >>>>> at >>>>> org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93) >>>>> >>>>> at >>>>> org.apache.flume.channel.BasicChannelSemantics.put(BasicChannelSemantics.java:76) >>>>> >>>>> at >>>>> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:189) >>>>> >>>>> ... 28 more >>>>> >>>>> 2013-04-05 13:17:34,296 DEBUG org.apache.flume.channel.file.Log: Files >>>>> currently in use: [14] >>>>> >>>>> 2013-04-05 13:17:34,305 INFO org.apache.avro.ipc.NettyServer: [id: >>>>> 0x498b5a73, /10.42.202.131:42784 :> /10.96.172.44:1442] DISCONNECTED >>>>> >>>>> 2013-04-05 13:17:34,305 INFO org.apache.avro.ipc.NettyServer: [id: >>>>> 0x498b5a73, /10.42.202.131:42784 :> /10.96.172.44:1442] UNBOUND >>>>> >>>>> 2013-04-05 13:17:34,305 INFO org.apache.avro.ipc.NettyServer: [id: >>>>> 0x498b5a73, /10.42.202.131:42784 :> /10.96.172.44:1442] CLOSED >>>>> >>>>> 2013-04-05 13:17:34,305 INFO org.apache.avro.ipc.NettyServer: Connection >>>>> to /10.42.202.131:42784 disconnected. >>>>> >>>>> 2013-04-05 13:17:38,200 ERROR org.apache.flume.SinkRunner: Unable to >>>>> deliver event. Exception follows. >>>>> >>>>> java.lang.IllegalStateException: begin() called when transaction is OPEN! >>>>> >>>>> at >>>>> com.google.common.base.Preconditions.checkState(Preconditions.java:145) >>>>> >>>>> at >>>>> org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131) >>>>> >>>>> at >>>>> org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:187) >>>>> >>>>> at >>>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) >>>>> >>>>> at >>>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) >>>>> >>>>> at java.lang.Thread.run(Thread.java:662) >>>>> >>>>> 2013-04-05 13:17:39,318 INFO org.apache.avro.ipc.NettyServer: [id: >>>>> 0x4e48f32a, /10.42.202.131:44085 => /10.96.172.44:1442] OPEN >>>>> >>>>> 2013-04-05 13:17:43,202 ERROR org.apache.flume.SinkRunner: Unable to >>>>> deliver event. Exception follows. >>>>> >>>>> java.lang.IllegalStateException: begin() called when transaction is OPEN! >>>>> >>>>> at >>>>> com.google.common.base.Preconditions.checkState(Preconditions.java:145) >>>>> >>>>> at >>>>> org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131) >>>>> >>>>> at >>>>> org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:187) >>>>> >>>>> at >>>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) >>>>> >>>>> at >>>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) >>>>> >>>>> at java.lang.Thread.run(Thread.java:662) >>>>> >>>>> 2013-04-05 13:17:45,853 INFO org.apache.avro.ipc.NettyServer: [id: >>>>> 0x4e48f32a, /10.42.202.131:44085 => /10.96.172.44:1442] BOUND: >>>>> /10.96.172.44:1442 >>>>> >>>>> 2013-04-05 13:17:45,853 INFO org.apache.avro.ipc.NettyServer: [id: >>>>> 0x4e48f32a, /10.42.202.131:44085 => /10.96.172.44:1442] CONNECTED: >>>>> /10.42.202.131:44085 >>>>> >>>>> 2013-04-05 13:17:45,854 DEBUG org.apache.flume.source.AvroSource: Avro >>>>> source S1: Received avro event batch of 39 events. >>>>> >>>>> 2013-04-05 13:17:45,958 DEBUG org.apache.flume.source.AvroSource: Avro >>>>> source S1: Received avro event batch of 1 events. >>>>> >>>>> 2013-04-05 13:17:48,499 DEBUG org.apache.zookeeper.ClientCnxn: Got ping >>>>> response for sessionid: 0x53dca4664900059 after 0ms >>>>> >>>>> 2013-04-05 13:17:50,854 ERROR org.apache.flume.SinkRunner: Unable to >>>>> deliver event. Exception follows. >>>>> >>>>> java.lang.IllegalStateException: begin() called when transaction is OPEN! >>>>> >>>>> at >>>>> com.google.common.base.Preconditions.checkState(Preconditions.java:145) >>>>> >>>>> at >>>>> org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131) >>>>> >>>>> at >>>>> org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:187) >>>>> >>>>> at >>>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) >>>>> >>>>> at >>>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) >>>>> >>>>> at java.lang.Thread.run(Thread.java:662) >>>>> >>>>> 2013-04-05 13:17:55,856 ERROR org.apache.flume.SinkRunner: Unable to >>>>> deliver event. Exception follows. >>>>> >>>>> java.lang.IllegalStateException: begin() called when transaction is OPEN! >>>>> >>>>> at >>>>> com.google.common.base.Preconditions.checkState(Preconditions.java:145) >>>>> >>>>> at >>>>> org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131) >>>>> >>>>> at >>>>> org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:187) >>>>> >>>>> at >>>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) >>>>> >>>>> at >>>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) >>>>> >>>>> at java.lang.Thread.run(Thread.java:662) >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> ================================================================== >>>>> >>>>> col1.sources = S1 >>>>> >>>>> col1.channels = fc >>>>> >>>>> col1.sinks = hBaseSink1 >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> col1.sources.S1.type = avro >>>>> >>>>> col1.sources.S1.bind=vm-15c2-3bbf >>>>> >>>>> col1.sources.S1.port=1442 >>>>> >>>>> col1.sources.S1.channels = fc >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> # Each sink's type must be defined >>>>> >>>>> col1.sinks.hBaseSink1.type = org.apache.flume.sink.hbase.HBaseSink >>>>> >>>>> col1.sinks.hBaseSink1.table=elf_log >>>>> >>>>> col1.sinks.hBaseSink1.columnFamily=content >>>>> >>>>> col1.sinks.hBaseSink1.serializer=com.citi.sponge.flume.collector.sink.LogHbaseEventSerializer >>>>> >>>>> col1.sinks.hBaseSink1.timeout=120 >>>>> >>>>> col1.sinks.hBaseSink1.batchSize=20 >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> #Specify the channel the sink should use >>>>> >>>>> col1.sinks.hBaseSink1.channel = fc >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> # Each channel's type is defined. >>>>> >>>>> col1.channels.fc.type = file >>>>> >>>>> col1.channels.fc.checkpointDir = /opt/sponge/flume/file-channel/checkpoint >>>>> >>>>> col1.channels.fc.dataDirs = /opt/sponge/flume/file-channel/dataDirs >>>>> >>>>> col1.channels.fc.transactionCapacity = 1000 >>>>> >>>>> col1.channels.fc.checkpointInterval = 30000 >>>>> >>>>> col1.channels.fc.maxFileSize = 2146435071 >>>>> >>>>> col1.channels.fc.minimumRequiredSpace = 524288000 >>>>> >>>>> col1.channels.fc.keep-alive = 5 >>>>> >>>>> col1.channels.fc.write-timeout = 10 >>>>> >>>>> col1.channels.fc.checkpoint-timeout = 600 >>>>> >>>> >>>> >>>> Thanks >>>> Madhu >>> >>> >>> >>> -- >>> Apache MRUnit - Unit testing MapReduce - http://mrunit.apache.org > > > > -- > Apache MRUnit - Unit testing MapReduce - http://mrunit.apache.org
