Author: ritchiem
Date: Wed Sep 26 03:45:21 2007
New Revision: 579577

URL: http://svn.apache.org/viewvc?rev=579577&view=rev
Log:
Updated TransportConnection to synchronize around the creation/destruction of 
VM Brokers. I had observed a ConcurrentModificationException in the 
KillAllVMBrokers().

This isn't good this suggests that the tests are overlapping. This fix won't 
address that problem but will stop any CModifications occuring. If there is 
test setup/teardown overlapping we should now see tests failing because the VM 
broker isn't there.

Modified:
    
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
    
incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java

Modified: 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=579577&r1=579576&r2=579577&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
 Wed Sep 26 03:45:21 2007
@@ -149,19 +149,21 @@
     {
         int port = details.getPort();
 
-        if (!_inVmPipeAddress.containsKey(port))
+        synchronized (_inVmPipeAddress)
         {
-            if (AutoCreate)
+            if (!_inVmPipeAddress.containsKey(port))
             {
-                createVMBroker(port);
-            }
-            else
-            {
-                throw new AMQVMBrokerCreationException(null, port, "VM Broker 
on port " + port
-                    + " does not exist. Auto create disabled.", null);
+                if (AutoCreate)
+                {
+                    createVMBroker(port);
+                }
+                else
+                {
+                    throw new AMQVMBrokerCreationException(null, port, "VM 
Broker on port " + port
+                        + " does not exist. Auto create disabled.", null);
+                }
             }
         }
-
         return new VmPipeTransportConnection(port);
     }
 
@@ -176,69 +178,71 @@
             config.setThreadModel(ReadWriteThreadModel.getInstance());
         }
 
-        if (!_inVmPipeAddress.containsKey(port))
+        synchronized (_inVmPipeAddress)
         {
-            _logger.info("Creating InVM Qpid.AMQP listening on port " + port);
-            IoHandlerAdapter provider = null;
-            try
+            if (!_inVmPipeAddress.containsKey(port))
             {
-                VmPipeAddress pipe = new VmPipeAddress(port);
-
-                provider = createBrokerInstance(port);
-
-                _acceptor.bind(pipe, provider);
-
-                _inVmPipeAddress.put(port, pipe);
-                _logger.info("Created InVM Qpid.AMQP listening on port " + 
port);
-            }
-            catch (IOException e)
-            {
-                _logger.error("Got IOException.", e);
-
-                // Try and unbind provider
+                _logger.info("Creating InVM Qpid.AMQP listening on port " + 
port);
+                IoHandlerAdapter provider = null;
                 try
                 {
                     VmPipeAddress pipe = new VmPipeAddress(port);
 
-                    try
-                    {
-                        _acceptor.unbind(pipe);
-                    }
-                    catch (Exception ignore)
-                    {
-                        // ignore
-                    }
-
-                    if (provider == null)
-                    {
-                        provider = createBrokerInstance(port);
-                    }
+                    provider = createBrokerInstance(port);
 
                     _acceptor.bind(pipe, provider);
+
                     _inVmPipeAddress.put(port, pipe);
                     _logger.info("Created InVM Qpid.AMQP listening on port " + 
port);
                 }
-                catch (IOException justUseFirstException)
+                catch (IOException e)
                 {
-                    String because;
-                    if (e.getCause() == null)
+                    _logger.error("Got IOException.", e);
+
+                    // Try and unbind provider
+                    try
                     {
-                        because = e.toString();
+                        VmPipeAddress pipe = new VmPipeAddress(port);
+
+                        try
+                        {
+                            _acceptor.unbind(pipe);
+                        }
+                        catch (Exception ignore)
+                        {
+                            // ignore
+                        }
+
+                        if (provider == null)
+                        {
+                            provider = createBrokerInstance(port);
+                        }
+
+                        _acceptor.bind(pipe, provider);
+                        _inVmPipeAddress.put(port, pipe);
+                        _logger.info("Created InVM Qpid.AMQP listening on port 
" + port);
                     }
-                    else
+                    catch (IOException justUseFirstException)
                     {
-                        because = e.getCause().toString();
-                    }
+                        String because;
+                        if (e.getCause() == null)
+                        {
+                            because = e.toString();
+                        }
+                        else
+                        {
+                            because = e.getCause().toString();
+                        }
 
-                    throw new AMQVMBrokerCreationException(null, port, because 
+ " Stopped binding of InVM Qpid.AMQP", e);
+                        throw new AMQVMBrokerCreationException(null, port, 
because + " Stopped binding of InVM Qpid.AMQP", e);
+                    }
                 }
             }
+            else
+            {
+                _logger.info("InVM Qpid.AMQP on port " + port + " already 
exits.");
+            }
         }
-        else
-        {
-            _logger.info("InVM Qpid.AMQP on port " + port + " already exits.");
-        }
-
     }
 
     private static IoHandlerAdapter createBrokerInstance(int port) throws 
AMQVMBrokerCreationException
@@ -285,25 +289,29 @@
     {
         _logger.info("Killing all VM Brokers");
         _acceptor.unbindAll();
-
-        Iterator keys = _inVmPipeAddress.keySet().iterator();
-
-        while (keys.hasNext())
+        synchronized (_inVmPipeAddress)
         {
-            int id = (Integer) keys.next();
-            _inVmPipeAddress.remove(id);
-        }
+            Iterator keys = _inVmPipeAddress.keySet().iterator();
 
+            while (keys.hasNext())
+            {
+                int id = (Integer) keys.next();
+                _inVmPipeAddress.remove(id);
+            }
+        }
     }
 
     public static void killVMBroker(int port)
     {
-        VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port);
-        if (pipe != null)
+        synchronized (_inVmPipeAddress)
         {
-            _logger.info("Killing VM Broker:" + port);
-            _inVmPipeAddress.remove(port);
-            _acceptor.unbind(pipe);
+            VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port);
+            if (pipe != null)
+            {
+                _logger.info("Killing VM Broker:" + port);
+                _inVmPipeAddress.remove(port);
+                _acceptor.unbind(pipe);
+            }
         }
     }
 

Modified: 
incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java?rev=579577&r1=579576&r2=579577&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
 Wed Sep 26 03:45:21 2007
@@ -434,6 +434,13 @@
         verifyMessages(_consumer.receive(1000));
     }
 
+    /**
+     * This test sends two messages receives on of them but doesn't ack it.
+     * The consumer is then closed
+     * the first message should be returned as redelivered.
+     *  the second message should be delivered normally. 
+     * @throws Exception
+     */
     public void testSend2ThenCloseAfter1andTryAgain() throws Exception
     {
         assertTrue("session is not transacted", _session.getTransacted());


Reply via email to