PROTON-881: Add Recv sample and required code changes / additions to the reactor
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/0ac98e76 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/0ac98e76 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/0ac98e76 Branch: refs/heads/master Commit: 0ac98e76be51fbd890613518541364301b191cc2 Parents: cd09de6 Author: Adrian Preston <prest...@uk.ibm.com> Authored: Wed Apr 22 21:35:36 2015 +0100 Committer: Adrian Preston <prest...@uk.ibm.com> Committed: Wed May 6 23:23:55 2015 +0100 ---------------------------------------------------------------------- .../qpid/proton/example/reactor/Recv.java | 79 +++++++++++++ .../apache/qpid/proton/reactor/Acceptor.java | 27 +++++ .../qpid/proton/reactor/FlowController.java | 72 ++++++++++++ .../org/apache/qpid/proton/reactor/Reactor.java | 3 + .../qpid/proton/reactor/impl/AcceptorImpl.java | 112 +++++++++++++++++++ .../qpid/proton/reactor/impl/IOHandler.java | 25 +++-- .../qpid/proton/reactor/impl/ReactorImpl.java | 12 +- .../qpid/proton/reactor/impl/SelectorImpl.java | 10 +- 8 files changed, 326 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0ac98e76/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Recv.java ---------------------------------------------------------------------- diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Recv.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Recv.java new file mode 100644 index 0000000..a5c6291 --- /dev/null +++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Recv.java @@ -0,0 +1,79 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.example.reactor; + +import java.io.IOException; + +import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.message.Message; +import org.apache.qpid.proton.reactor.FlowController; +import org.apache.qpid.proton.reactor.Handshaker; +import org.apache.qpid.proton.reactor.Reactor; + +public class Recv extends BaseHandler { + + private Recv() { + add(new Handshaker()); + add(new FlowController()); + } + + @Override + public void onReactorInit(Event event) { + try { + // Create an amqp acceptor. + event.getReactor().acceptor("0.0.0.0", 5672); + + // There is an optional third argument to the Reactor.acceptor + // call. Using it, we could supply a handler here that would + // become the handler for all accepted connections. If we omit + // it, the reactor simply inherets all the connection events. + } catch(IOException ioException) { + ioException.printStackTrace(); // TODO: what is the right answer? + } + } + + @Override + public void onDelivery(Event event) { + Receiver recv = (Receiver)event.getLink(); + Delivery delivery = recv.current(); + if (delivery.isReadable() && !delivery.isPartial()) { + int size = delivery.pending(); + byte[] buffer = new byte[size]; + int read = recv.recv(buffer, 0, buffer.length); + recv.advance(); + + Message msg = Proton.message(); + msg.decode(buffer, 0, read); + System.out.println(((AmqpValue)msg.getBody()).getValue()); + } + } + + public static void main(String[] args) throws IOException { + Reactor r = Proton.reactor(new Recv()); + r.run(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0ac98e76/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java new file mode 100644 index 0000000..91b20a3 --- /dev/null +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java @@ -0,0 +1,27 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.reactor; + +public interface Acceptor { + + void close(); +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0ac98e76/proton-j/src/main/java/org/apache/qpid/proton/reactor/FlowController.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/FlowController.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/FlowController.java new file mode 100644 index 0000000..c8b999b --- /dev/null +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/FlowController.java @@ -0,0 +1,72 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.reactor; + +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Link; +import org.apache.qpid.proton.engine.Receiver; + +public class FlowController extends BaseHandler { + + private int drained; + private int window; + + public FlowController(int window) { + // XXX: a window of 1 doesn't work because we won't necessarily get + // notified when the one allowed delivery is settled + if (window <= 1) throw new IllegalArgumentException(); + this.window = window; + this.drained = 0; + } + + public FlowController() { + this(1024); + } + + private void topup(Receiver link, int window) { + int delta = window - link.getCredit(); + link.flow(delta); + } + + @Override + public void onUnhandled(Event event) { + int window = this.window; + Link link = event.getLink(); + + switch(event.getType()) { + case LINK_LOCAL_OPEN: + case LINK_REMOTE_OPEN: + case LINK_FLOW: + case DELIVERY: + if (link instanceof Receiver) { + this.drained += link.drained(); + if (this.drained == 0) { + topup((Receiver)link, window); + } + } + break; + default: + break; + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0ac98e76/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java index 0c56a48..68375b1 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java @@ -97,4 +97,7 @@ public interface Reactor { // TODO: acceptorClose Connection connection(Handler handler); + + Acceptor acceptor(String host, int port) throws IOException; + Acceptor acceptor(String host, int port, Handler handler) throws IOException; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0ac98e76/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java new file mode 100644 index 0000000..efe0eb4 --- /dev/null +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java @@ -0,0 +1,112 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.reactor.impl; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; + +import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Handler; +import org.apache.qpid.proton.engine.Sasl; +import org.apache.qpid.proton.engine.Sasl.SaslOutcome; +import org.apache.qpid.proton.engine.Transport; +import org.apache.qpid.proton.reactor.Acceptor; +import org.apache.qpid.proton.reactor.Reactor; +import org.apache.qpid.proton.reactor.Selectable; +import org.apache.qpid.proton.reactor.Selectable.Callback; + +public class AcceptorImpl implements Acceptor { + + private class AcceptorReadable implements Callback { + @Override + public void run(Selectable selectable) { + Reactor reactor = selectable.getReactor(); + try { + SocketChannel socketChannel = ((ServerSocketChannel)selectable.getChannel()).accept(); + Handler handler = (Handler)selectable.getAttachment(); + if (handler == null) { + // TODO: set selectable.getAttachment() to null? + handler = reactor.getHandler(); + } + Connection conn = reactor.connection(handler); + Transport trans = Proton.transport(); + // TODO: the C code calls pn_transport_set_server(trans) - is there a Java equivalent we need to worry about? + Sasl sasl = trans.sasl(); + sasl.server(); // TODO: it would be nice if SASL was more pluggable than this (but this is what the C API currently does...) + //sasl.allowSkip(true); // TODO: this in in the C code - but the proton-j code throws a ProtonUnsupportedOperationException (as it is not implemented) + sasl.setMechanisms("ANONYMOUS"); + sasl.done(SaslOutcome.PN_SASL_OK); + trans.bind(conn); + IOHandler.selectableTransport(reactor, socketChannel.socket(), trans); // TODO: could we pass in a channel object instead of doing socketChannel.socket()? + } catch(IOException ioException) { + ioException.printStackTrace(); + // TODO: what do we do with this exception? + } + } + } + + private class AcceptorFinalize implements Callback { + @Override + public void run(Selectable selectable) { + try { + selectable.getChannel().close(); + } catch(IOException ioException) { + ioException.printStackTrace(); + // TODO: what now? + } + } + } + + private final Selectable sel; + + protected AcceptorImpl(Reactor reactor, String host, int port, Handler handler) throws IOException { + ServerSocketChannel ssc = ServerSocketChannel.open(); + ssc.bind(new InetSocketAddress(host, port)); + sel = reactor.selectable(); + sel.setChannel(ssc); + sel.onReadable(new AcceptorReadable()); + sel.onFinalize(new AcceptorFinalize()); // TODO: currently, this is not called from anywhere!! + sel.setReactor(reactor); + sel.setAttachment(handler); + sel.setReading(true); + reactor.update(sel); + } + + @Override + public void close() { + if (!sel.isTerminal()) { + Reactor reactor = sel.getReactor(); + try { + sel.getChannel().close(); + } catch(IOException ioException) { + ioException.printStackTrace(); + // TODO: what now? + } + sel.setChannel(null); + sel.terminate(); + reactor.update(sel); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0ac98e76/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java index ee988ee..a182722 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java @@ -78,6 +78,7 @@ public class IOHandler extends BaseHandler { } Transport transport = Proton.transport(); Sasl sasl = transport.sasl(); + sasl.client(); sasl.setMechanisms("ANONYMOUS"); transport.bind(connection); } @@ -86,7 +87,7 @@ public class IOHandler extends BaseHandler { private void handleBound(Reactor reactor, Event event) { Connection connection = event.getConnection(); String hostname = connection.getHostname(); - if (hostname == null) { + if (hostname == null || hostname.equals("")) { return; } @@ -116,7 +117,7 @@ public class IOHandler extends BaseHandler { } // pni_connection_capacity from connection.c - private int capacity(Selectable selectable) { + private static int capacity(Selectable selectable) { Transport transport = selectable.getTransport(); int capacity = transport.capacity(); if (capacity < 0) { @@ -128,7 +129,7 @@ public class IOHandler extends BaseHandler { } // pni_connection_pending from connection.c - private int pending(Selectable selectable) { + private static int pending(Selectable selectable) { Transport transport = selectable.getTransport(); int pending = transport.pending(); if (pending < 0) { @@ -140,7 +141,7 @@ public class IOHandler extends BaseHandler { } // pni_connection_deadline from connection.c - private long deadline(Selectable selectable) { + private static long deadline(Selectable selectable) { Reactor reactor = selectable.getReactor(); Transport transport = selectable.getTransport(); long deadline = transport.tick(reactor.now()); @@ -148,7 +149,7 @@ public class IOHandler extends BaseHandler { } // pni_connection_update from connection.c - private void update(Selectable selectable) { + private static void update(Selectable selectable) { int c = capacity(selectable); int p = pending(selectable); selectable.setReading(c > 0); @@ -157,7 +158,7 @@ public class IOHandler extends BaseHandler { } // pni_connection_readable from connection.c - private class ConnectionReadable implements Callback { + private static class ConnectionReadable implements Callback { @Override public void run(Selectable selectable) { Reactor reactor = selectable.getReactor(); @@ -191,7 +192,7 @@ public class IOHandler extends BaseHandler { } // pni_connection_writable from connection.c - private class ConnectionWritable implements Callback { + private static class ConnectionWritable implements Callback { @Override public void run(Selectable selectable) { Reactor reactor = selectable.getReactor(); @@ -224,7 +225,7 @@ public class IOHandler extends BaseHandler { } // pni_connection_error from connection.c - private class ConnectionError implements Callback { + private static class ConnectionError implements Callback { @Override public void run(Selectable selectable) { Reactor reactor = selectable.getReactor(); @@ -235,7 +236,7 @@ public class IOHandler extends BaseHandler { } // pni_connection_expired from connection.c - private class ConnectionExpired implements Callback { + private static class ConnectionExpired implements Callback { @Override public void run(Selectable selectable) { Reactor reactor = selectable.getReactor(); @@ -250,7 +251,7 @@ public class IOHandler extends BaseHandler { } } - private class ConnectionFinalize implements Callback { + private static class ConnectionFinalize implements Callback { @Override public void run(Selectable selectable) { try { @@ -263,11 +264,11 @@ public class IOHandler extends BaseHandler { } // pn_reactor_selectable_transport - private Selectable selectableTransport(Reactor reactor, Socket socket, Transport transport) { + protected static Selectable selectableTransport(Reactor reactor, Socket socket, Transport transport) { // TODO: this code needs to be able to deal with a null socket (this is our equivalent of PN_INVALID_SOCKET) Selectable selectable = reactor.selectable(); selectable.setChannel(socket != null ? socket.getChannel() : null); - selectable.onReadable(new ConnectionReadable()); + selectable.onReadable(new ConnectionReadable()); // TODO: *IF* these callbacks are stateless, do we more than one instance of them? selectable.onWritable(new ConnectionWritable()); selectable.onError(new ConnectionError()); selectable.onExpired(new ConnectionExpired()); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0ac98e76/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java index 0a7f84d..9c4f817 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java @@ -37,6 +37,7 @@ import org.apache.qpid.proton.engine.Handler; import org.apache.qpid.proton.engine.impl.CollectorImpl; import org.apache.qpid.proton.engine.impl.ConnectionImpl; import org.apache.qpid.proton.engine.impl.HandlerEndpointImpl; +import org.apache.qpid.proton.reactor.Acceptor; import org.apache.qpid.proton.reactor.Reactor; import org.apache.qpid.proton.reactor.ReactorChild; import org.apache.qpid.proton.reactor.Selectable; @@ -454,7 +455,16 @@ public class ReactorImpl implements Reactor { connection.collect(collector); children.add(connection); ((ConnectionImpl)connection).setReactor(this); - // TODO: C code adds a reference back to the reactor from connection return connection; } + + @Override + public Acceptor acceptor(String host, int port) throws IOException { + return this.acceptor(host, port, null); + } + + @Override + public Acceptor acceptor(String host, int port, Handler handler) throws IOException { + return new AcceptorImpl(this, host, port, handler); + } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0ac98e76/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java index 35a6555..e4e284e 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java @@ -23,6 +23,7 @@ package org.apache.qpid.proton.reactor.impl; import java.io.IOException; import java.nio.channels.SelectionKey; +import java.nio.channels.ServerSocketChannel; import java.util.HashSet; import java.util.Iterator; @@ -58,7 +59,13 @@ public class SelectorImpl implements Selector { public void update(Selectable selectable) { if (selectable.getChannel() != null) { int interestedOps = 0; - if (selectable.isReading()) interestedOps |= SelectionKey.OP_READ; + if (selectable.isReading()) { + if (selectable.getChannel() instanceof ServerSocketChannel) { + interestedOps |= SelectionKey.OP_ACCEPT; + } else { + interestedOps |= SelectionKey.OP_READ; + } + } if (selectable.isWriting()) interestedOps |= SelectionKey.OP_WRITE; SelectionKey key = selectable.getChannel().keyFor(selector); key.interestOps(interestedOps); @@ -111,6 +118,7 @@ public class SelectorImpl implements Selector { for (SelectionKey key : selector.selectedKeys()) { Selectable selectable = (Selectable)key.attachment(); if (key.isReadable()) readable.add(selectable); + if (key.isAcceptable()) readable.add(selectable); if (key.isWritable()) writeable.add(selectable); } selector.selectedKeys().clear(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org