Author: rhs
Date: Wed Oct 17 04:40:37 2007
New Revision: 585453

URL: http://svn.apache.org/viewvc?rev=585453&view=rev
Log:
added a bit of API around MinaHandler

Added:
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Binding.java
   (with props)
Modified:
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java

Added: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Binding.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Binding.java?rev=585453&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Binding.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Binding.java
 Wed Oct 17 04:40:37 2007
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport;
+
+
+/**
+ * Binding
+ *
+ */
+
+public interface Binding<E,T>
+{
+
+    E endpoint(Sender<T> sender);
+
+    Receiver<T> receiver(E endpoint);
+
+}

Propchange: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Binding.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java?rev=585453&r1=585452&r2=585453&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java
 Wed Oct 17 04:40:37 2007
@@ -35,6 +35,7 @@
 import org.apache.mina.transport.socket.nio.SocketAcceptor;
 import org.apache.mina.transport.socket.nio.SocketConnector;
 
+import org.apache.qpidity.transport.Binding;
 import org.apache.qpidity.transport.Connection;
 import org.apache.qpidity.transport.ConnectionDelegate;
 import org.apache.qpidity.transport.Receiver;
@@ -54,7 +55,7 @@
  * @author Rafael H. Schloming
  */
 //RA making this public until we sort out the package issues
-public class MinaHandler implements IoHandler
+public class MinaHandler<E> implements IoHandler
 {
 
     private static final Logger log = Logger.get(MinaHandler.class);
@@ -64,18 +65,17 @@
         ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
     }
 
-    private final ConnectionDelegate delegate;
-    private final InputHandler.State state;
+    private final Binding<E,java.nio.ByteBuffer> binding;
 
-    public MinaHandler(ConnectionDelegate delegate, InputHandler.State state)
+    private MinaHandler(Binding<E,java.nio.ByteBuffer> binding)
     {
-        this.delegate = delegate;
-        this.state = state;
+        this.binding = binding;
     }
 
+
     public void messageReceived(IoSession ssn, Object obj)
     {
-        Attachment attachment = (Attachment) ssn.getAttachment();
+        Attachment<E> attachment = (Attachment<E>) ssn.getAttachment();
         ByteBuffer buf = (ByteBuffer) obj;
         attachment.receiver.received(buf.buf());
     }
@@ -98,17 +98,16 @@
     public void sessionOpened(final IoSession ssn)
     {
         log.debug("opened: %s", this);
-        // XXX: hardcoded max-frame
-        Connection conn = new Connection
-            (new Disassembler(new OutputHandler(new MinaSender(ssn)),
-                              64*1024 - 1),
-             delegate);
-        Receiver<java.nio.ByteBuffer> receiver =
-            new InputHandler(new Assembler(conn), state);
-        ssn.setAttachment(new Attachment(conn, receiver));
-        // XXX
+        E endpoint = binding.endpoint(new MinaSender(ssn));
+        Attachment<E>  attachment =
+            new Attachment<E>(endpoint, binding.receiver(endpoint));
+
+        // We need to synchronize and notify here because the MINA
+        // connect future returns the session prior to the attachment
+        // being set. This is arguably a bug in MINA.
         synchronized (ssn)
         {
+            ssn.setAttachment(attachment);
             ssn.notifyAll();
         }
     }
@@ -116,7 +115,7 @@
     public void sessionClosed(IoSession ssn)
     {
         log.debug("closed: %s", ssn);
-        Attachment attachment = (Attachment) ssn.getAttachment();
+        Attachment<E> attachment = (Attachment<E>) ssn.getAttachment();
         attachment.receiver.closed();
         ssn.setAttachment(null);
     }
@@ -126,41 +125,53 @@
         // do nothing
     }
 
-    private class Attachment
+    private static class Attachment<E>
     {
 
-        Connection connection;
+        E endpoint;
         Receiver<java.nio.ByteBuffer> receiver;
 
-        Attachment(Connection connection,
-                   Receiver<java.nio.ByteBuffer> receiver)
+        Attachment(E endpoint, Receiver<java.nio.ByteBuffer> receiver)
         {
-            this.connection = connection;
+            this.endpoint = endpoint;
             this.receiver = receiver;
         }
     }
 
     public static final void accept(String host, int port,
-                                    ConnectionDelegate delegate)
+                                    Binding<?,java.nio.ByteBuffer> binding)
+        throws IOException
+    {
+        accept(new InetSocketAddress(host, port), binding);
+    }
+
+    public static final <E> void accept(SocketAddress address,
+                                        Binding<E,java.nio.ByteBuffer> binding)
         throws IOException
     {
         IoAcceptor acceptor = new SocketAcceptor();
-        acceptor.bind(new InetSocketAddress(host, port),
-                      new MinaHandler(delegate, InputHandler.State.PROTO_HDR));
+        acceptor.bind(address, new MinaHandler<E>(binding));
     }
 
-    public static final Connection connect(String host, int port,
-                                           ConnectionDelegate delegate)
+    public static final <E> E connect(String host, int port,
+                                      Binding<E,java.nio.ByteBuffer> binding)
     {
-        MinaHandler handler = new MinaHandler(delegate,
-                                              InputHandler.State.FRAME_HDR);
-        SocketAddress addr = new InetSocketAddress(host, port);
+        return connect(new InetSocketAddress(host, port), binding);
+    }
+
+    public static final <E> E connect(SocketAddress address,
+                                      Binding<E,java.nio.ByteBuffer> binding)
+    {
+        MinaHandler<E> handler = new MinaHandler<E>(binding);
         SocketConnector connector = new SocketConnector();
         connector.setWorkerTimeout(0);
-        ConnectFuture cf = connector.connect(addr, handler);
+        ConnectFuture cf = connector.connect(address, handler);
         cf.join();
         IoSession ssn = cf.getSession();
-        // XXX
+
+        // We need to synchronize and wait here because the MINA
+        // connect future returns the session prior to the attachment
+        // being set. This is arguably a bug in MINA.
         synchronized (ssn)
         {
             while (ssn.getAttachment() == null)
@@ -175,8 +186,53 @@
                 }
             }
         }
-        Attachment attachment = (Attachment) ssn.getAttachment();
-        return attachment.connection;
+
+        Attachment<E> attachment = (Attachment<E>) ssn.getAttachment();
+        return attachment.endpoint;
+    }
+
+    public static final void accept(String host, int port,
+                                    ConnectionDelegate delegate)
+        throws IOException
+    {
+        accept(host, port, new ConnectionBinding
+               (delegate, InputHandler.State.PROTO_HDR));
+    }
+
+    public static final Connection connect(String host, int port,
+                                           ConnectionDelegate delegate)
+    {
+        return connect(host, port, new ConnectionBinding
+                       (delegate, InputHandler.State.FRAME_HDR));
+    }
+
+    private static class ConnectionBinding
+        implements Binding<Connection,java.nio.ByteBuffer>
+    {
+
+        private final ConnectionDelegate delegate;
+        private final InputHandler.State state;
+
+        ConnectionBinding(ConnectionDelegate delegate,
+                          InputHandler.State state)
+        {
+            this.delegate = delegate;
+            this.state = state;
+        }
+
+        public Connection endpoint(Sender<java.nio.ByteBuffer> sender)
+        {
+            // XXX: hardcoded max-frame
+            return new Connection
+                (new Disassembler(new OutputHandler(sender), 64*1024 - 1),
+                 delegate);
+        }
+
+        public Receiver<java.nio.ByteBuffer> receiver(Connection conn)
+        {
+            return new InputHandler(new Assembler(conn), state);
+        }
+
     }
 
 }


Reply via email to