PROTON-881: Add Recv sample and required code changes / additions to the reactor


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/0ac98e76
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/0ac98e76
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/0ac98e76

Branch: refs/heads/master
Commit: 0ac98e76be51fbd890613518541364301b191cc2
Parents: cd09de6
Author: Adrian Preston <prest...@uk.ibm.com>
Authored: Wed Apr 22 21:35:36 2015 +0100
Committer: Adrian Preston <prest...@uk.ibm.com>
Committed: Wed May 6 23:23:55 2015 +0100

----------------------------------------------------------------------
 .../qpid/proton/example/reactor/Recv.java       |  79 +++++++++++++
 .../apache/qpid/proton/reactor/Acceptor.java    |  27 +++++
 .../qpid/proton/reactor/FlowController.java     |  72 ++++++++++++
 .../org/apache/qpid/proton/reactor/Reactor.java |   3 +
 .../qpid/proton/reactor/impl/AcceptorImpl.java  | 112 +++++++++++++++++++
 .../qpid/proton/reactor/impl/IOHandler.java     |  25 +++--
 .../qpid/proton/reactor/impl/ReactorImpl.java   |  12 +-
 .../qpid/proton/reactor/impl/SelectorImpl.java  |  10 +-
 8 files changed, 326 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0ac98e76/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Recv.java
----------------------------------------------------------------------
diff --git 
a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Recv.java
 
b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Recv.java
new file mode 100644
index 0000000..a5c6291
--- /dev/null
+++ 
b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Recv.java
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.reactor.FlowController;
+import org.apache.qpid.proton.reactor.Handshaker;
+import org.apache.qpid.proton.reactor.Reactor;
+
+public class Recv extends BaseHandler {
+
+    private Recv() {
+        add(new Handshaker());
+        add(new FlowController());
+    }
+
+    @Override
+    public void onReactorInit(Event event) {
+        try {
+            // Create an amqp acceptor.
+            event.getReactor().acceptor("0.0.0.0", 5672);
+
+            // There is an optional third argument to the Reactor.acceptor
+            // call. Using it, we could supply a handler here that would
+            // become the handler for all accepted connections. If we omit
+            // it, the reactor simply inherets all the connection events.
+        } catch(IOException ioException) {
+            ioException.printStackTrace();    // TODO: what is the right 
answer?
+        }
+    }
+
+    @Override
+    public void onDelivery(Event event) {
+        Receiver recv = (Receiver)event.getLink();
+        Delivery delivery = recv.current();
+        if (delivery.isReadable() && !delivery.isPartial()) {
+            int size = delivery.pending();
+            byte[] buffer = new byte[size];
+            int read = recv.recv(buffer, 0, buffer.length);
+            recv.advance();
+
+            Message msg = Proton.message();
+            msg.decode(buffer, 0, read);
+            System.out.println(((AmqpValue)msg.getBody()).getValue());
+        }
+    }
+
+    public static void main(String[] args) throws IOException {
+        Reactor r = Proton.reactor(new Recv());
+        r.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0ac98e76/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java
new file mode 100644
index 0000000..91b20a3
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.reactor;
+
+public interface Acceptor {
+
+    void close();
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0ac98e76/proton-j/src/main/java/org/apache/qpid/proton/reactor/FlowController.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/reactor/FlowController.java 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/FlowController.java
new file mode 100644
index 0000000..c8b999b
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/FlowController.java
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.reactor;
+
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Receiver;
+
+public class FlowController extends BaseHandler {
+
+    private int drained;
+    private int window;
+
+    public FlowController(int window) {
+        // XXX: a window of 1 doesn't work because we won't necessarily get
+        // notified when the one allowed delivery is settled
+        if (window <= 1) throw new IllegalArgumentException();
+        this.window = window;
+        this.drained = 0;
+    }
+
+    public FlowController() {
+        this(1024);
+    }
+
+    private void topup(Receiver link, int window) {
+        int delta = window - link.getCredit();
+        link.flow(delta);
+    }
+
+    @Override
+    public void onUnhandled(Event event) {
+        int window = this.window;
+        Link link = event.getLink();
+
+        switch(event.getType()) {
+        case LINK_LOCAL_OPEN:
+        case LINK_REMOTE_OPEN:
+        case LINK_FLOW:
+        case DELIVERY:
+            if (link instanceof Receiver) {
+                this.drained += link.drained();
+                if (this.drained == 0) {
+                    topup((Receiver)link, window);
+                }
+            }
+            break;
+        default:
+            break;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0ac98e76/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
index 0c56a48..68375b1 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
@@ -97,4 +97,7 @@ public interface Reactor {
     // TODO: acceptorClose
 
     Connection connection(Handler handler);
+
+    Acceptor acceptor(String host, int port) throws IOException;
+    Acceptor acceptor(String host, int port, Handler handler) throws 
IOException;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0ac98e76/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
new file mode 100644
index 0000000..efe0eb4
--- /dev/null
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
@@ -0,0 +1,112 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.reactor.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Handler;
+import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Sasl.SaslOutcome;
+import org.apache.qpid.proton.engine.Transport;
+import org.apache.qpid.proton.reactor.Acceptor;
+import org.apache.qpid.proton.reactor.Reactor;
+import org.apache.qpid.proton.reactor.Selectable;
+import org.apache.qpid.proton.reactor.Selectable.Callback;
+
+public class AcceptorImpl implements Acceptor {
+
+    private class AcceptorReadable implements Callback {
+        @Override
+        public void run(Selectable selectable) {
+            Reactor reactor = selectable.getReactor();
+            try {
+                SocketChannel socketChannel = 
((ServerSocketChannel)selectable.getChannel()).accept();
+                Handler handler = (Handler)selectable.getAttachment();
+                if (handler == null) {
+                    // TODO: set selectable.getAttachment() to null?
+                    handler = reactor.getHandler();
+                }
+                Connection conn = reactor.connection(handler);
+                Transport trans = Proton.transport();
+                // TODO: the C code calls pn_transport_set_server(trans) - is 
there a Java equivalent we need to worry about?
+                Sasl sasl = trans.sasl();
+                sasl.server();  // TODO: it would be nice if SASL was more 
pluggable than this (but this is what the C API currently does...)
+                //sasl.allowSkip(true); // TODO: this in in the C code - but 
the proton-j code throws a ProtonUnsupportedOperationException (as it is not 
implemented)
+                sasl.setMechanisms("ANONYMOUS");
+                sasl.done(SaslOutcome.PN_SASL_OK);
+                trans.bind(conn);
+                IOHandler.selectableTransport(reactor, socketChannel.socket(), 
trans);  // TODO: could we pass in a channel object instead of doing 
socketChannel.socket()?
+            } catch(IOException ioException) {
+                ioException.printStackTrace();
+                // TODO: what do we do with this exception?
+            }
+        }
+    }
+
+    private class AcceptorFinalize implements Callback {
+        @Override
+        public void run(Selectable selectable) {
+            try {
+                selectable.getChannel().close();
+            } catch(IOException ioException) {
+                ioException.printStackTrace();
+                // TODO: what now?
+            }
+        }
+    }
+
+    private final Selectable sel;
+
+    protected AcceptorImpl(Reactor reactor, String host, int port, Handler 
handler) throws IOException {
+        ServerSocketChannel ssc = ServerSocketChannel.open();
+        ssc.bind(new InetSocketAddress(host, port));
+        sel = reactor.selectable();
+        sel.setChannel(ssc);
+        sel.onReadable(new AcceptorReadable());
+        sel.onFinalize(new AcceptorFinalize()); // TODO: currently, this is 
not called from anywhere!!
+        sel.setReactor(reactor);
+        sel.setAttachment(handler);
+        sel.setReading(true);
+        reactor.update(sel);
+    }
+
+    @Override
+    public void close() {
+        if (!sel.isTerminal()) {
+            Reactor reactor = sel.getReactor();
+            try {
+                sel.getChannel().close();
+            } catch(IOException ioException) {
+                ioException.printStackTrace();
+                // TODO: what now?
+            }
+            sel.setChannel(null);
+            sel.terminate();
+            reactor.update(sel);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0ac98e76/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
index ee988ee..a182722 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
@@ -78,6 +78,7 @@ public class IOHandler extends BaseHandler {
         }
         Transport transport = Proton.transport();
         Sasl sasl = transport.sasl();
+        sasl.client();
         sasl.setMechanisms("ANONYMOUS");
         transport.bind(connection);
     }
@@ -86,7 +87,7 @@ public class IOHandler extends BaseHandler {
     private void handleBound(Reactor reactor, Event event) {
         Connection connection = event.getConnection();
         String hostname = connection.getHostname();
-        if (hostname == null) {
+        if (hostname == null || hostname.equals("")) {
             return;
         }
 
@@ -116,7 +117,7 @@ public class IOHandler extends BaseHandler {
     }
 
     // pni_connection_capacity from connection.c
-    private int capacity(Selectable selectable) {
+    private static int capacity(Selectable selectable) {
         Transport transport = selectable.getTransport();
         int capacity = transport.capacity();
         if (capacity < 0) {
@@ -128,7 +129,7 @@ public class IOHandler extends BaseHandler {
     }
 
     // pni_connection_pending from connection.c
-    private int pending(Selectable selectable) {
+    private static int pending(Selectable selectable) {
         Transport transport = selectable.getTransport();
         int pending = transport.pending();
         if (pending < 0) {
@@ -140,7 +141,7 @@ public class IOHandler extends BaseHandler {
     }
 
     // pni_connection_deadline from connection.c
-    private long deadline(Selectable selectable) {
+    private static long deadline(Selectable selectable) {
         Reactor reactor = selectable.getReactor();
         Transport transport = selectable.getTransport();
         long deadline = transport.tick(reactor.now());
@@ -148,7 +149,7 @@ public class IOHandler extends BaseHandler {
     }
 
     // pni_connection_update from connection.c
-    private void update(Selectable selectable) {
+    private static void update(Selectable selectable) {
         int c = capacity(selectable);
         int p = pending(selectable);
         selectable.setReading(c > 0);
@@ -157,7 +158,7 @@ public class IOHandler extends BaseHandler {
     }
 
     // pni_connection_readable from connection.c
-    private class ConnectionReadable implements Callback {
+    private static class ConnectionReadable implements Callback {
         @Override
         public void run(Selectable selectable) {
             Reactor reactor = selectable.getReactor();
@@ -191,7 +192,7 @@ public class IOHandler extends BaseHandler {
     }
 
     // pni_connection_writable from connection.c
-    private class ConnectionWritable implements Callback {
+    private static class ConnectionWritable implements Callback {
         @Override
         public void run(Selectable selectable) {
             Reactor reactor = selectable.getReactor();
@@ -224,7 +225,7 @@ public class IOHandler extends BaseHandler {
     }
 
     // pni_connection_error from connection.c
-    private class ConnectionError implements Callback {
+    private static class ConnectionError implements Callback {
         @Override
         public void run(Selectable selectable) {
             Reactor reactor = selectable.getReactor();
@@ -235,7 +236,7 @@ public class IOHandler extends BaseHandler {
     }
 
     // pni_connection_expired from connection.c
-    private class ConnectionExpired implements Callback {
+    private static class ConnectionExpired implements Callback {
         @Override
         public void run(Selectable selectable) {
             Reactor reactor = selectable.getReactor();
@@ -250,7 +251,7 @@ public class IOHandler extends BaseHandler {
         }
     }
 
-    private class ConnectionFinalize implements Callback {
+    private static class ConnectionFinalize implements Callback {
         @Override
         public void run(Selectable selectable) {
             try {
@@ -263,11 +264,11 @@ public class IOHandler extends BaseHandler {
     }
 
     // pn_reactor_selectable_transport
-    private Selectable selectableTransport(Reactor reactor, Socket socket, 
Transport transport) {
+    protected static Selectable selectableTransport(Reactor reactor, Socket 
socket, Transport transport) {
         // TODO: this code needs to be able to deal with a null socket (this 
is our equivalent of PN_INVALID_SOCKET)
         Selectable selectable = reactor.selectable();
         selectable.setChannel(socket != null ? socket.getChannel() : null);
-        selectable.onReadable(new ConnectionReadable());
+        selectable.onReadable(new ConnectionReadable());    // TODO: *IF* 
these callbacks are stateless, do we more than one instance of them?
         selectable.onWritable(new ConnectionWritable());
         selectable.onError(new ConnectionError());
         selectable.onExpired(new ConnectionExpired());

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0ac98e76/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
index 0a7f84d..9c4f817 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
@@ -37,6 +37,7 @@ import org.apache.qpid.proton.engine.Handler;
 import org.apache.qpid.proton.engine.impl.CollectorImpl;
 import org.apache.qpid.proton.engine.impl.ConnectionImpl;
 import org.apache.qpid.proton.engine.impl.HandlerEndpointImpl;
+import org.apache.qpid.proton.reactor.Acceptor;
 import org.apache.qpid.proton.reactor.Reactor;
 import org.apache.qpid.proton.reactor.ReactorChild;
 import org.apache.qpid.proton.reactor.Selectable;
@@ -454,7 +455,16 @@ public class ReactorImpl implements Reactor {
         connection.collect(collector);
         children.add(connection);
         ((ConnectionImpl)connection).setReactor(this);
-        // TODO: C code adds a reference back to the reactor from connection
         return connection;
     }
+
+    @Override
+    public Acceptor acceptor(String host, int port) throws IOException {
+        return this.acceptor(host, port, null);
+    }
+
+    @Override
+    public Acceptor acceptor(String host, int port, Handler handler) throws 
IOException {
+        return new AcceptorImpl(this, host, port, handler);
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0ac98e76/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
index 35a6555..e4e284e 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
@@ -23,6 +23,7 @@ package org.apache.qpid.proton.reactor.impl;
 
 import java.io.IOException;
 import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
 import java.util.HashSet;
 import java.util.Iterator;
 
@@ -58,7 +59,13 @@ public class SelectorImpl implements Selector {
     public void update(Selectable selectable) {
         if (selectable.getChannel() != null) {
             int interestedOps = 0;
-            if (selectable.isReading()) interestedOps |= SelectionKey.OP_READ;
+            if (selectable.isReading()) {
+                if (selectable.getChannel() instanceof ServerSocketChannel) {
+                    interestedOps |= SelectionKey.OP_ACCEPT;
+                } else {
+                    interestedOps |= SelectionKey.OP_READ;
+                }
+            }
             if (selectable.isWriting()) interestedOps |= SelectionKey.OP_WRITE;
             SelectionKey key = selectable.getChannel().keyFor(selector);
             key.interestOps(interestedOps);
@@ -111,6 +118,7 @@ public class SelectorImpl implements Selector {
         for (SelectionKey key : selector.selectedKeys()) {
             Selectable selectable = (Selectable)key.attachment();
             if (key.isReadable()) readable.add(selectable);
+            if (key.isAcceptable()) readable.add(selectable);
             if (key.isWritable()) writeable.add(selectable);
         }
         selector.selectedKeys().clear();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to