Looks like you are hitting Avro IPC timeouts - you should probably increase it,
especially if you are talking over WAN.
--
Hari Shreedharan
On Tuesday, April 16, 2013 at 11:38 AM, Chris Neal wrote:
> I'm seeing the same thing :)
>
> Mine is all on a local LAN though, so the fact that an RPC call doesn't reply
> in 10000ms or 20000ms is quite odd. My configuration is for the most part
> the same as Denis' configuration. Two tiered system, ExecSources running
> tail -F on log files to an AvroSink, to an AvroSource, loading into HDFS on
> the back tier.
>
> I, too, see this on the AvroSink
>
> Either (A):
> [2013-04-15 23:57:14.827] [org.apache.flume.sink.LoadBalancingSinkProcessor]
> [ WARN] [SinkRunner-PollingRunner-LoadBalancingSinkProcessor] []
> (LoadBalancingSinkProcessor.java:process:154) Sink failed to consume event.
> Attempting next sink if available.
> org.apache.flume.EventDeliveryException: Failed to send events
> at org.apache.flume.sink.AvroSink.process(AvroSink.java:324)
> at
> org.apache.flume.sink.LoadBalancingSinkProcessor.process(LoadBalancingSinkProcessor.java:151)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
> at java.lang.Thread.run(Thread.java:619)
> Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient {
> host: hadoopjt01.pegs.com (http://hadoopjt01.pegs.com), port: 10000 }: Failed
> to send batch
> at
> org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:236)
> at org.apache.flume.sink.AvroSink.process(AvroSink.java:308)
> ... 3 more
> Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient {
> host: hadoopjt01.pegs.com (http://hadoopjt01.pegs.com), port: 10000 }:
> Handshake timed out after 20000ms
> at
> org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:280)
> at
> org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:224)
> ... 4 more
> Caused by: java.util.concurrent.TimeoutException
> at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228)
> at java.util.concurrent.FutureTask.get(FutureTask.java:91)
> at
> org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:278)
> ... 5 more
>
> or (B):
> [2013-04-15 19:49:01.135] [org.apache.flume.sink.LoadBalancingSinkProcessor]
> [ WARN] [SinkRunner-PollingRunner-LoadBalancingSinkProcessor] []
> (LoadBalancingSinkProcessor.java:process:154) Sink failed to consume event.
> Attempting next sink if available.
> org.apache.flume.EventDeliveryException: Failed to send events
> at org.apache.flume.sink.AvroSink.process(AvroSink.java:324)
> at
> org.apache.flume.sink.LoadBalancingSinkProcessor.process(LoadBalancingSinkProcessor.java:151)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
> at java.lang.Thread.run(Thread.java:619)
> Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient {
> host: hadoopjt01.pegs.com (http://hadoopjt01.pegs.com), port: 10000 }: Failed
> to send batch
> at
> org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:236)
> at org.apache.flume.sink.AvroSink.process(AvroSink.java:308)
> ... 3 more
> Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient {
> host: hadoopjt01.pegs.com (http://hadoopjt01.pegs.com), port: 10000 }: RPC
> request timed out
> at
> org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:321)
> at
> org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:295)
> at
> org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:224)
> ... 4 more
> Caused by: java.util.concurrent.TimeoutException
> at org.apache.avro.ipc.CallFuture.get(CallFuture.java:132)
> at
> org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:310)
> ... 6 more
>
> The only thing I see on the AvroSource tier is the disconnect/reconnect
> happening:
>
> [2013-04-15 19:49:00.992] [org.apache.avro.ipc.NettyServer] [ INFO]
> [pool-11-thread-10] [] (NettyServer.java:handleUpstream:171) [id:
> 0x2a24ed78, /138.113.127.4:34481 (http://138.113.127.4:34481) :>
> /138.113.127.72:10000 (http://138.113.127.72:10000)] DISCONNECTED
> [2013-04-15 19:49:00.992] [org.apache.avro.ipc.NettyServer] [ INFO]
> [pool-11-thread-10] [] (NettyServer.java:handleUpstream:171) [id:
> 0x2a24ed78, /138.113.127.4:34481 (http://138.113.127.4:34481) :>
> /138.113.127.72:10000 (http://138.113.127.72:10000)] UNBOUND
> [2013-04-15 19:49:00.992] [org.apache.avro.ipc.NettyServer] [ INFO]
> [pool-11-thread-10] [] (NettyServer.java:handleUpstream:171) [id:
> 0x2a24ed78, /138.113.127.4:34481 (http://138.113.127.4:34481) :>
> /138.113.127.72:10000 (http://138.113.127.72:10000)] CLOSED
> [2013-04-15 19:49:00.993] [org.apache.avro.ipc.NettyServer] [ INFO]
> [pool-11-thread-10] [] (NettyServer.java:channelClosed:209) Connection to
> /138.113.127.4:34481 (http://138.113.127.4:34481) disconnected.
> [2013-04-15 19:49:03.331] [org.apache.avro.ipc.NettyServer] [ INFO]
> [pool-10-thread-1] [] (NettyServer.java:handleUpstream:171) [id: 0x3883b82e,
> /138.113.127.4:62442 (http://138.113.127.4:62442) => /138.113.127.72:10000
> (http://138.113.127.72:10000)] OPEN
> [2013-04-15 19:49:03.332] [org.apache.avro.ipc.NettyServer] [ INFO]
> [pool-11-thread-13] [] (NettyServer.java:handleUpstream:171) [id:
> 0x3883b82e, /138.113.127.4:62442 (http://138.113.127.4:62442) =>
> /138.113.127.72:10000 (http://138.113.127.72:10000)] BOUND:
> /138.113.127.72:10000 (http://138.113.127.72:10000)
> [2013-04-15 19:49:03.333] [org.apache.avro.ipc.NettyServer] [ INFO]
> [pool-11-thread-13] [] (NettyServer.java:handleUpstream:171) [id:
> 0x3883b82e, /138.113.127.4:62442 (http://138.113.127.4:62442) =>
> /138.113.127.72:10000 (http://138.113.127.72:10000)] CONNECTED:
> /138.113.127.4:62442 (http://138.113.127.4:62442)
>
> Is there some way to determine exactly where this bottleneck is? The config
> options for AvroSource/Sink are quite short, so there is not much tuning to
> do. Here is what I have:
>
> # avro-hadoopjt01-sink properties
> udprodae01.sinks.avro-hadoopjt01-sink.type = avro
> udprodae01.sinks.avro-hadoopjt01-sink.hostname = hadoopjt01.pegs.com
> (http://hadoopjt01.pegs.com)
> udprodae01.sinks.avro-hadoopjt01-sink.port = 10000
> udprodae01.sinks.avro-hadoopjt01-sink.batch-size = 100
>
> # avro-hadoopjt01-source properties
> hadoopjt01-1.sources.avro-hadoopjt01-source.type = avro
> hadoopjt01-1.sources.avro-hadoopjt01-source.bind = hadoopjt01.pegs.com
> (http://hadoopjt01.pegs.com)
> hadoopjt01-1.sources.avro-hadoopjt01-source.port = 10000
> hadoopjt01-1.sources.avro-hadoopjt01-source.threads = 64
>
>
> I can try increasing the AvroSink timeout values, but they seem quite
> adequate at the defaults. Maybe more threads on the AvroSource?
>
> Any advice would be much appreciated!
> Chris
>
>
>
>
>
>
>
>
> On Wed, Feb 6, 2013 at 12:10 PM, Denis Lowe <[email protected]
> (mailto:[email protected])> wrote:
> > I noticed that the logs in the destination server were reporting dropped
> > connections from the upstream server:
> >
> > NettyServer$NettyServerAvroHandler.handleUpstream:171) - [id: 0x08e56d76,
> > /SOURCE_HOST:43599 :> /LOCAL_HOST:4003] DISCONNECTED
> > NettyServer$NettyServerAvroHandler.handleUpstream:171) - [id: 0x08e56d76,
> > /SOURCE_HOST:43599 :> /LOCAL_HOST:4003] UNBOUND
> > NettyServer$NettyServerAvroHandler.handleUpstream:171) - [id: 0x08e56d76,
> > /SOURCE_HOST:43599 :> /LOCAL_HOST:4003] CLOSED
> > NettyServer$NettyServerAvroHandler.channelClosed:209) - Connection to
> > /SOURCE_HOST:43599 disconnected.
> >
> > The other thing I observed is that these errors only ever occur from our EU
> > servers (connecting to our centralized downstream collectors in US West) -
> > We are running Flume in Amazon EC2
> >
> > I can see in the log that the connection is restored quite quickly.
> >
> > I gather that the network latency between Europe and US West is causing the
> > connection between the 2 servers to 'appear' lost, thus resulting in the
> > above errors?
> >
> > Are there any recommended config settings to compensate for this?
> >
> > On 6 February 2013 00:21, Juhani Connolly <[email protected]
> > (mailto:[email protected])> wrote:
> > > Is there anything unusual in the logs for the destination(avroSource)
> > > server
> > >
> > > Since the error is happening in the AvroSink, no data is getting lost.
> > > The failed data will get rolled back, removal from the local channel is
> > > cancelled, and it will attempt to resend it.
> > >
> > >
> > > On 02/06/2013 03:23 PM, Denis Lowe wrote:
> > > > We are running Flume-NG 1.3.1 and have noticed periodically the
> > > > following ERROR occurring (a few times daily):
> > > >
> > > > We are using the File Channel connecting to 2 downstream collector
> > > > agents in 'round_robin' mode, using avro source/sinks.
> > > >
> > > > We are using the config described below to deliver 5 different log
> > > > types (to 5 different ports downstream) and have observed the below
> > > > error occurring randomly across all the ports.
> > > >
> > > > We tried doubling the connect-timeout to 40000 (from the default of
> > > > 20000) with no success.
> > > > The agent appears to recover and keep on processing data.
> > > >
> > > >
> > > > My question is:
> > > > Has this data been lost? or will flume eventually retry until a
> > > > successfull delivery has been made?
> > > > Are there any other config changes I can make to prevent/reduce this
> > > > occurring in the future?
> > > >
> > > >
> > > > 05 Feb 2013 23:12:21,650 ERROR
> > > > [SinkRunner-PollingRunner-DefaultSinkProcessor]
> > > > (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to
> > > > deliver event. Exception follows.
> > > > org.apache.flume.EventDeliveryException: Failed to send events
> > > > at org.apache.flume.sink.AvroSink.process(AvroSink.java:325)
> > > > at
> > > > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
> > > > at
> > > > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
> > > > at java.lang.Thread.run(Thread.java:662)
> > > > Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient
> > > > { host: collector1, port: 4003 }: Failed to send batch
> > > > at
> > > > org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:236)
> > > > at org.apache.flume.sink.AvroSink.process(AvroSink.java:309)
> > > > ... 3 more
> > > > Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient
> > > > { host: collector2, port: 4003 }: RPC request timed out
> > > > at
> > > > org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:321)
> > > > at
> > > > org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:295)
> > > > at
> > > > org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:224)
> > > > ... 4 more
> > > > Caused by: java.util.concurrent.TimeoutException
> > > > at org.apache.avro.ipc.CallFuture.get(CallFuture.java:132)
> > > > at
> > > > org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:310)
> > > > ... 6 more
> > > >
> > > >
> > > > Below is a snapshot the current config:
> > > >
> > > > agent.sources.eventdata.command = tail -qn +0 -F
> > > > /var/log/event-logs/live/eventdata.log
> > > > agent.sources.eventdata.channels = eventdata-avro-channel
> > > > agent.sources.eventdata.batchSize = 10
> > > > agent.sources.eventdata.restart = true
> > > >
> > > > ## event source interceptor
> > > > agent.sources.eventdata.interceptors.host-interceptor.type =
> > > > org.apache.flume.interceptor.HostInterceptor$Builder
> > > > agent.sources.eventdata.interceptors.host-interceptor.hostHeader =
> > > > source-host
> > > > agent.sources.eventdata.interceptors.host-interceptor.useIP = false
> > > >
> > > > ## eventdata channel
> > > > agent.channels.eventdata-avro-channel.type = file
> > > > agent.channels.eventdata-avro-channel.checkpointDir =
> > > > /mnt/flume-ng/checkpoint/eventdata-avro-channel
> > > > agent.channels.eventdata-avro-channel.dataDirs =
> > > > /mnt/flume-ng/data1/eventdata-avro-channel,/mnt/flume-ng/data2/eventdata-avro-channel
> > > > agent.channels.eventdata-avro-channel.maxFileSize = 210000000
> > > > agent.channels.eventdata-avro-channel.capacity = 2000000
> > > > agent.channels.eventdata-avro-channel.checkpointInterval = 300000
> > > > agent.channels.eventdata-avro-channel.transactionCapacity = 10000
> > > > agent.channels.eventdata-avro-channel.keep-alive = 20
> > > > agent.channels.eventdata-avro-channel.write-timeout = 20
> > > >
> > > > ## 2 x downstream click sinks for load balancing and failover
> > > > agent.sinks.eventdata-avro-sink-1.type = avro
> > > > agent.sinks.eventdata-avro-sink-1.channel = eventdata-avro-channel
> > > > agent.sinks.eventdata-avro-sink-1.hostname = collector1
> > > > agent.sinks.eventdata-avro-sink-1.port = 4003
> > > > agent.sinks.eventdata-avro-sink-1.batch-size = 100
> > > > agent.sinks.eventdata-avro-sink-1.connect-timeout = 40000
> > > >
> > > > agent.sinks.eventdata-avro-sink-2.type = avro
> > > > agent.sinks.eventdata-avro-sink-2.channel = eventdata-avro-channel
> > > > agent.sinks.eventdata-avro-sink-2.hostname = collector2
> > > > agent.sinks.eventdata-avro-sink-2.port = 4003
> > > > agent.sinks.eventdata-avro-sink-2.batch-size = 100
> > > > agent.sinks.eventdata-avro-sink-2.connect-timeout = 40000
> > > >
> > > > ## load balance config
> > > > agent.sinkgroups = eventdata-avro-sink-group
> > > > agent.sinkgroups.eventdata-avro-sink-group.sinks =
> > > > eventdata-avro-sink-1 eventdata-avro-sink-2
> > > > agent.sinkgroups.eventdata-avro-sink-group.processor.type = load_balance
> > > > agent.sinkgroups.eventdata-avro-sink-group.processor.selector =
> > > > round_robin
> > > >
> > > >
> > > > Denis.
> >
>