Author: ritchiem
Date: Wed Nov 15 08:07:31 2006
New Revision: 475286

URL: http://svn.apache.org/viewvc?view=rev&rev=475286
Log:
QPID-92 Changes to bring MINA use up to MINA-Head (1.1.0) compatibility 

Modified:
    
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/transport/SocketTransportConnection.java
    
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/transport/TransportConnection.java
    
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/transport/VmPipeTransportConnection.java
    
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/codec/BasicDeliverTest.java
    
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/codec/Client.java
    
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/codec/Server.java
    
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/test/unit/client/protocol/TestIoSession.java
    
incubator/qpid/trunk/qpid/java/cluster/src/org/apache/qpid/server/cluster/Main.java
    
incubator/qpid/trunk/qpid/java/cluster/test/org/apache/qpid/server/cluster/TestSession.java
    incubator/qpid/trunk/qpid/java/common/lib/mina/mina-core-1.1.0-SNAPSHOT.jar
    
incubator/qpid/trunk/qpid/java/common/lib/mina/mina-filter-ssl-1.1.0-SNAPSHOT.jar
    incubator/qpid/trunk/qpid/java/common/lib/mina/mina-java5-1.1.0-SNAPSHOT.jar
    
incubator/qpid/trunk/qpid/java/systests/src/org/apache/qpid/server/protocol/MockIoSession.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/transport/SocketTransportConnection.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/transport/SocketTransportConnection.java?view=diff&rev=475286&r1=475285&r2=475286
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/transport/SocketTransportConnection.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/transport/SocketTransportConnection.java
 Wed Nov 15 08:07:31 2006
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.client.transport;
 
-import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.pool.ReadWriteThreadModel;
 import org.apache.qpid.jms.BrokerDetails;
@@ -29,7 +28,7 @@
 import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.IoConnector;
 import org.apache.mina.common.SimpleByteBufferAllocator;
-import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
+
 import org.apache.mina.transport.socket.nio.SocketSessionConfig;
 
 import java.io.IOException;
@@ -54,7 +53,7 @@
     public void connect(AMQProtocolHandler protocolHandler, BrokerDetails 
brokerDetail)
             throws IOException
     {
-        
ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers"));
+        
ByteBuffer.setPreferDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers"));
 
         // the MINA default is currently to use the pooled allocator although 
this may change in future
         // once more testing of the performance of the simple allocator has 
been done
@@ -64,17 +63,15 @@
         }
 
         final IoConnector ioConnector = 
_socketConnectorFactory.newSocketConnector();
-        SocketConnectorConfig cfg = (SocketConnectorConfig) 
ioConnector.getDefaultConfig();
-
         // if we do not use our own thread model we get the MINA default which 
is to use
         // its own leader-follower model
         boolean readWriteThreading = 
Boolean.getBoolean("amqj.shared_read_write_pool");
         if (readWriteThreading)
         {
-            cfg.setThreadModel(new ReadWriteThreadModel());
+            ioConnector.setThreadModel(new ReadWriteThreadModel());
         }
 
-        SocketSessionConfig scfg = (SocketSessionConfig) 
cfg.getSessionConfig();
+        SocketSessionConfig scfg = (SocketSessionConfig) 
ioConnector.getSessionConfig();
         
scfg.setTcpNoDelay("true".equalsIgnoreCase(System.getProperty("amqj.tcpNoDelay",
 "true")));
         scfg.setSendBufferSize(Integer.getInteger("amqj.sendBufferSize", 
DEFAULT_BUFFER_SIZE));
         _logger.info("send-buffer-size = " + scfg.getSendBufferSize());
@@ -83,7 +80,8 @@
         final InetSocketAddress address = new 
InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort());
         protocolHandler.setUseSSL(brokerDetail.useSSL());
         _logger.info("Attempting connection to " + address);
-        ConnectFuture future = ioConnector.connect(address, protocolHandler);
+        ioConnector.setHandler(protocolHandler);
+        ConnectFuture future = ioConnector.connect(address);
 
         // wait for connection to complete
         if (future.join(brokerDetail.getTimeout()))

Modified: 
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/transport/TransportConnection.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/transport/TransportConnection.java?view=diff&rev=475286&r1=475285&r2=475286
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/transport/TransportConnection.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/transport/TransportConnection.java
 Wed Nov 15 08:07:31 2006
@@ -23,8 +23,6 @@
 import org.apache.log4j.Logger;
 import org.apache.mina.common.IoConnector;
 import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoServiceConfig;
-
 
 import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
 import org.apache.mina.transport.vmpipe.VmPipeAddress;
@@ -34,7 +32,6 @@
 import org.apache.qpid.pool.ReadWriteThreadModel;
 import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
 
-
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -68,9 +65,7 @@
     {
         _acceptor = new VmPipeAcceptor();
 
-        IoServiceConfig config = _acceptor.getDefaultConfig();
-
-        config.setThreadModel(new ReadWriteThreadModel());
+        _acceptor.setThreadModel(new ReadWriteThreadModel());
     }
 
     public static ITransportConnection getInstance() throws 
AMQTransportConnectionException
@@ -140,7 +135,7 @@
                 break;
             case VM:
             {
-                _instance = getVMTransport(details, 
Boolean.getBoolean("amqj.AutoCreateVMBroker"));
+                _instance = getVMTransport(details, 
Boolean.getBoolean("amqj.noAutoCreateVMBroker"));
                 break;
             }
         }
@@ -163,20 +158,23 @@
         return -1;
     }
 
-    private static ITransportConnection getVMTransport(BrokerDetails details, 
boolean AutoCreate) throws AMQVMBrokerCreationException
+    private static ITransportConnection getVMTransport(BrokerDetails details, 
boolean noAutoCreate) throws AMQVMBrokerCreationException
     {
         int port = details.getPort();
 
         if (!_inVmPipeAddress.containsKey(port))
         {
-            if (AutoCreate)
+            if (noAutoCreate)
             {
-                createVMBroker(port);
+                throw new AMQVMBrokerCreationException(port, "VM Broker on 
port " + port + " does not exist. Auto create disabled.");
+
             }
             else
             {
-                throw new AMQVMBrokerCreationException(port, "VM Broker on 
port " + port + " does not exist. Auto create disabled.");
+                _logger.info("Auto Creating VMBroker on port " + port);
+                createVMBroker(port);
             }
+
         }
 
         return new VmPipeTransportConnection(port);
@@ -197,7 +195,9 @@
 
                 provider = createBrokerInstance(port);
 
-                _acceptor.bind(pipe, provider);
+                _acceptor.setLocalAddress(pipe);
+                _acceptor.setHandler(provider);
+                _acceptor.bind();
 
                 _inVmPipeAddress.put(port, pipe);
                 _logger.info("Created InVM Qpid.AMQP listening on port " + 
port);
@@ -213,7 +213,7 @@
 
                     try
                     {
-                        _acceptor.unbind(pipe);
+                        _acceptor.unbind();
                     }
                     catch (Exception ignore)
                     {
@@ -225,8 +225,10 @@
                         provider = createBrokerInstance(port);
                     }
 
-                    _acceptor.bind(pipe, provider);
-                    _inVmPipeAddress.put(port, pipe);
+                    _acceptor.setLocalAddress(pipe);
+                    _acceptor.setHandler(provider);
+                    _acceptor.bind();
+                    _inVmPipeAddress.put(port, _acceptor);
                     _logger.info("Created InVM Qpid.AMQP listening on port " + 
port);
                 }
                 catch (IOException justUseFirstException)
@@ -294,14 +296,14 @@
     public static void killAllVMBrokers()
     {
         _logger.info("Killing all VM Brokers");
-        _acceptor.unbindAll();
 
         Iterator keys = _inVmPipeAddress.keySet().iterator();
 
         while (keys.hasNext())
         {
             int id = (Integer) keys.next();
-            _inVmPipeAddress.remove(id);
+
+            ((VmPipeAcceptor)_inVmPipeAddress.remove(id)).unbind();
         }
 
     }
@@ -313,7 +315,7 @@
         {
             _logger.info("Killing VM Broker:" + port);
             _inVmPipeAddress.remove(port);
-            _acceptor.unbind(pipe);
+            _acceptor.unbind();
         }
     }
 

Modified: 
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/transport/VmPipeTransportConnection.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/transport/VmPipeTransportConnection.java?view=diff&rev=475286&r1=475285&r2=475286
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/transport/VmPipeTransportConnection.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/transport/VmPipeTransportConnection.java
 Wed Nov 15 08:07:31 2006
@@ -28,7 +28,6 @@
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.log4j.Logger;
 import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.IoServiceConfig;
 import org.apache.mina.transport.vmpipe.VmPipeAddress;
 import org.apache.mina.transport.vmpipe.VmPipeConnector;
 
@@ -48,18 +47,18 @@
     public void connect(AMQProtocolHandler protocolHandler, BrokerDetails 
brokerDetail) throws IOException
     {
         final VmPipeConnector ioConnector = new VmPipeConnector();
-        final IoServiceConfig cfg = ioConnector.getDefaultConfig();
         ReferenceCountingExecutorService executorService = 
ReferenceCountingExecutorService.getInstance();
         PoolingFilter asyncRead = new PoolingFilter(executorService, 
PoolingFilter.READ_EVENTS,
                                                     "AsynchronousReadFilter");
-        cfg.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead);
+        ioConnector.getFilterChain().addFirst("AsynchronousReadFilter", 
asyncRead);
         PoolingFilter asyncWrite = new PoolingFilter(executorService, 
PoolingFilter.WRITE_EVENTS,
                                                      
"AsynchronousWriteFilter");
-        cfg.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite);
+        ioConnector.getFilterChain().addLast("AsynchronousWriteFilter", 
asyncWrite);
         
         final VmPipeAddress address = new VmPipeAddress(_port);
         _logger.info("Attempting connection to " + address);
-        ConnectFuture future = ioConnector.connect(address, protocolHandler);
+        ioConnector.setHandler(protocolHandler);
+        ConnectFuture future = ioConnector.connect(address);
         // wait for connection to complete
         future.join();
         // we call getSession which throws an IOException if there has been an 
error connecting

Modified: 
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/codec/BasicDeliverTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/codec/BasicDeliverTest.java?view=diff&rev=475286&r1=475285&r2=475286
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/codec/BasicDeliverTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/codec/BasicDeliverTest.java
 Wed Nov 15 08:07:31 2006
@@ -159,11 +159,6 @@
             return null;  //To change body of implemented methods use File | 
Settings | File Templates.
         }
 
-        public IoServiceConfig getServiceConfig()
-        {
-            return null;
-        }
-
         public IoHandler getHandler()
         {
             return null;  //To change body of implemented methods use File | 
Settings | File Templates.
@@ -199,7 +194,7 @@
             return null;  //To change body of implemented methods use File | 
Settings | File Templates.
         }
 
-        public int getScheduledWriteRequests()
+        public int getScheduledWriteMessages()
         {
             return 0;  //To change body of implemented methods use File | 
Settings | File Templates.
         }

Modified: 
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/codec/Client.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/codec/Client.java?view=diff&rev=475286&r1=475285&r2=475286
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/codec/Client.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/codec/Client.java
 Wed Nov 15 08:07:31 2006
@@ -53,7 +53,11 @@
         AMQDataBlock block = BasicDeliverTest.getDataBlock(size);
 
         InetSocketAddress address = new InetSocketAddress(host, port);
-        ConnectFuture future = new SocketConnector().connect(address, this);
+
+        SocketConnector ioConnector = new SocketConnector();
+        ioConnector.setHandler(this);
+        ConnectFuture future = ioConnector.connect(address);
+
         future.join();
         _session = future.getSession();
 

Modified: 
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/codec/Server.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/codec/Server.java?view=diff&rev=475286&r1=475285&r2=475286
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/codec/Server.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/codec/Server.java
 Wed Nov 15 08:07:31 2006
@@ -34,7 +34,12 @@
 {
     Server(int port) throws Exception
     {
-        new SocketAcceptor().bind(new InetSocketAddress(port), this);
+
+        SocketAcceptor acceptor = new SocketAcceptor();
+
+        acceptor.setLocalAddress(new InetSocketAddress(port));
+        acceptor.setHandler(this);
+        acceptor.bind();
         System.out.println("Listening on " + port);
     }
 

Modified: 
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/test/unit/client/protocol/TestIoSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/test/unit/client/protocol/TestIoSession.java?view=diff&rev=475286&r1=475285&r2=475286
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/test/unit/client/protocol/TestIoSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/test/unit/client/protocol/TestIoSession.java
 Wed Nov 15 08:07:31 2006
@@ -45,10 +45,6 @@
         return null;
     }
 
-    public IoServiceConfig getServiceConfig() {
-        return null;
-    }
-
     public IoHandler getHandler() {
         return null;
     }
@@ -73,7 +69,7 @@
         return null;
     }
 
-    public int getScheduledWriteRequests() {
+    public int getScheduledWriteMessages() {
         return 0;
     }
 

Modified: 
incubator/qpid/trunk/qpid/java/cluster/src/org/apache/qpid/server/cluster/Main.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/org/apache/qpid/server/cluster/Main.java?view=diff&rev=475286&r1=475285&r2=475286
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/cluster/src/org/apache/qpid/server/cluster/Main.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/cluster/src/org/apache/qpid/server/cluster/Main.java
 Wed Nov 15 08:07:31 2006
@@ -31,7 +31,6 @@
 import org.apache.log4j.Logger;
 import org.apache.mina.common.IoAcceptor;
 import org.apache.mina.transport.socket.nio.SocketAcceptor;
-import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
 import org.apache.mina.transport.socket.nio.SocketSessionConfig;
 import org.apache.qpid.pool.ReadWriteThreadModel;
 import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -72,8 +71,7 @@
         try
         {
             IoAcceptor acceptor = new SocketAcceptor();
-            SocketAcceptorConfig sconfig = (SocketAcceptorConfig) 
acceptor.getDefaultConfig();
-            SocketSessionConfig sc = (SocketSessionConfig) 
sconfig.getSessionConfig();
+            SocketSessionConfig sc = (SocketSessionConfig) 
acceptor.getSessionConfig();
 
             sc.setReceiveBufferSize(connectorConfig.socketReceiveBufferSize);
             sc.setSendBufferSize(connectorConfig.socketWriteBuferSize);
@@ -83,14 +81,16 @@
             // implementation provided by MINA
             if (connectorConfig.enableExecutorPool)
             {
-                sconfig.setThreadModel(new ReadWriteThreadModel());
+                acceptor.setThreadModel(new ReadWriteThreadModel());
             }
 
             String host = InetAddress.getLocalHost().getHostName();
             ClusteredProtocolHandler handler = new 
ClusteredProtocolHandler(new InetSocketAddress(host, port));
             if (connectorConfig.enableNonSSL)
             {
-                acceptor.bind(new InetSocketAddress(port), handler, sconfig);
+                acceptor.setLocalAddress(new InetSocketAddress(port));
+                acceptor.setHandler(handler);
+                acceptor.bind();
                 _logger.info("Qpid.AMQP listening on non-SSL port " + port);
                 handler.connect(commandLine.getOptionValue("j"));
             }
@@ -99,7 +99,9 @@
             {
                 ClusteredProtocolHandler sslHandler = new 
ClusteredProtocolHandler(handler);
                 sslHandler.setUseSSL(true);
-                acceptor.bind(new InetSocketAddress(connectorConfig.sslPort), 
handler, sconfig);
+                acceptor.setLocalAddress(new 
InetSocketAddress(connectorConfig.sslPort));
+                acceptor.setHandler(handler);
+                acceptor.bind();
                 _logger.info("Qpid.AMQP listening on SSL port " + 
connectorConfig.sslPort);
             }
         }

Modified: 
incubator/qpid/trunk/qpid/java/cluster/test/org/apache/qpid/server/cluster/TestSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/test/org/apache/qpid/server/cluster/TestSession.java?view=diff&rev=475286&r1=475285&r2=475286
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/cluster/test/org/apache/qpid/server/cluster/TestSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/cluster/test/org/apache/qpid/server/cluster/TestSession.java
 Wed Nov 15 08:07:31 2006
@@ -32,11 +32,6 @@
         return null;  //TODO
     }
 
-    public IoServiceConfig getServiceConfig()
-    {
-        return null;  //TODO        
-    }
-
     public IoHandler getHandler()
     {
         return null;  //TODO
@@ -222,7 +217,7 @@
         return 0;  //TODO
     }
 
-    public int getScheduledWriteRequests()
+    public int getScheduledWriteMessages()
     {
         return 0;  //TODO
     }

Modified: 
incubator/qpid/trunk/qpid/java/common/lib/mina/mina-core-1.1.0-SNAPSHOT.jar
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/lib/mina/mina-core-1.1.0-SNAPSHOT.jar?view=diff&rev=475286&r1=475285&r2=475286
==============================================================================
Binary files - no diff available.

Modified: 
incubator/qpid/trunk/qpid/java/common/lib/mina/mina-filter-ssl-1.1.0-SNAPSHOT.jar
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/lib/mina/mina-filter-ssl-1.1.0-SNAPSHOT.jar?view=diff&rev=475286&r1=475285&r2=475286
==============================================================================
Binary files - no diff available.

Modified: 
incubator/qpid/trunk/qpid/java/common/lib/mina/mina-java5-1.1.0-SNAPSHOT.jar
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/lib/mina/mina-java5-1.1.0-SNAPSHOT.jar?view=diff&rev=475286&r1=475285&r2=475286
==============================================================================
Binary files - no diff available.

Modified: 
incubator/qpid/trunk/qpid/java/systests/src/org/apache/qpid/server/protocol/MockIoSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/org/apache/qpid/server/protocol/MockIoSession.java?view=diff&rev=475286&r1=475285&r2=475286
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/systests/src/org/apache/qpid/server/protocol/MockIoSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/systests/src/org/apache/qpid/server/protocol/MockIoSession.java
 Wed Nov 15 08:07:31 2006
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -52,11 +52,6 @@
         return null;  //To change body of implemented methods use File | 
Settings | File Templates.
     }
 
-    public IoServiceConfig getServiceConfig()
-    {
-        return null;
-    }
-
     public IoHandler getHandler()
     {
         return null;  //To change body of implemented methods use File | 
Settings | File Templates.
@@ -249,7 +244,7 @@
         return 0;  //To change body of implemented methods use File | Settings 
| File Templates.
     }
 
-    public int getScheduledWriteRequests()
+    public int getScheduledWriteMessages()
     {
         return 0;  //To change body of implemented methods use File | Settings 
| File Templates.
     }


Reply via email to