Hello,
i am using avro source to send the events to Flume Avro source. Connection
is fine but when i am sending the events it is throwing Avro remote
exception.
I have an application which uses RPC client factory get
connected to the avro source. RpcClient is successfuly connected to
the Avro Source but when i am sending the events to the Avro source i
am getting AvroRemoteException.
This is the following code snippet am using:
AvroSource source = new AvroSource();
Channel channel = new MemoryChannel();
List<Channel> ch = new ArrayList();
ch.add(channel);
ChannelSelector cs = new ReplicatingChannelSelector();
cs.setChannels(ch);
Configurables.configure(cs, new Context());
ChannelProcessor cp = new ChannelProcessor(cs);
Context context = new Context();
context.put("bind","localhost"
);
context.put("port", "12345");
source.configure(context);
source.setChannelProcessor(cp);
source.start();
//Server server = new NettyServer(new
SpecificResponder(AvroSource.class, source), new
InetSocketAddress("localhost",12345));
//server.start();
AvroSourceProtocol client =
SpecificRequestor.getClient(AvroSourceProtocol.class, new
NettyTransceiver(new InetSocketAddress(12345)));
AvroFlumeEvent avroEvent = new AvroFlumeEvent();
avroEvent.setHeaders(new HashMap<CharSequence, CharSequence>());
avroEvent.setBody(ByteBuffer.wrap("Hello avro".getBytes()));
Status status = client.append(avroEvent);
Transaction transaction = channel.getTransaction();
transaction.begin();
Event event1 = channel.take();
transaction.commit();
transaction.close();
And i am getting the following exception:
[New I/O server worker #1-1] WARN org.apache.avro.ipc.Responder - user error
java.lang.NullPointerException
at
org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:183)
at org.apache.flume.source.AvroSource.append(AvroSource.java:179)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at
org.apache.avro.ipc.specific.SpecificResponder.respond(SpecificResponder.java:88)
at org.apache.avro.ipc.Responder.respond(Responder.java:149)
at
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:140)
at
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)
at
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:125)
at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:783)
at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:302)
at
org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:317)
at
org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:299)
at
org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:216)
at
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)
at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
at
org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
at
org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
at
org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
at
org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at
org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:44)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)