Author: ritchiem
Date: Tue Jul 15 10:03:34 2008
New Revision: 676973
URL: http://svn.apache.org/viewvc?rev=676973&view=rev
Log:
QPID-984 : Applied fix from M2.1.x that adds requried synchronization around
setup and tear down of Connections.
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=676973&r1=676972&r2=676973&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
Tue Jul 15 10:03:34 2008
@@ -40,7 +40,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.net.Socket;
-
/**
* The TransportConnection is a helper class responsible for connecting to an
AMQ server. It sets up the underlying
* connector, which currently always uses TCP/IP sockets. It creates the
"protocol handler" which deals with MINA
@@ -85,38 +84,18 @@
throw new AMQNoTransportForProtocolException(details, null, null);
}
- /* if (transport == _currentInstance)
- {
- if (transport == VM)
- {
- if (_currentVMPort == details.getPort())
- {
- return _instance;
- }
- }
- else
- {
- return _instance;
- }
- }
-
- _currentInstance = transport;*/
-
- ITransportConnection instance;
switch (transport)
{
case SOCKET:
- instance =
- new SocketTransportConnection(new
SocketTransportConnection.SocketConnectorFactory()
- {
- public IoConnector newSocketConnector()
- {
- return new ExistingSocketConnector();
- }
- });
- break;
+ return new SocketTransportConnection(new
SocketTransportConnection.SocketConnectorFactory()
+ {
+ public IoConnector newSocketConnector()
+ {
+ return new ExistingSocketConnector();
+ }
+ });
case TCP:
- instance = new SocketTransportConnection(new
SocketTransportConnection.SocketConnectorFactory()
+ return new SocketTransportConnection(new
SocketTransportConnection.SocketConnectorFactory()
{
public IoConnector newSocketConnector()
{
@@ -125,8 +104,8 @@
if (Boolean.getBoolean("qpidnio"))
{
_logger.warn("Using Qpid MultiThreaded NIO - " +
(System.getProperties().containsKey("qpidnio")
- ? "Qpid NIO
is new default"
- :
"Sysproperty 'qpidnio' is set"));
+
? "Qpid NIO is new default"
+
: "Sysproperty 'qpidnio' is set"));
result = new MultiThreadSocketConnector();
}
else
@@ -141,18 +120,13 @@
return result;
}
});
- break;
case VM:
{
- instance = getVMTransport(details,
Boolean.getBoolean("amqj.AutoCreateVMBroker"));
- break;
+ return getVMTransport(details,
Boolean.getBoolean("amqj.AutoCreateVMBroker"));
}
default:
- // FIXME: TGM
- throw new AMQNoTransportForProtocolException(details, null,
null);
+ throw new AMQNoTransportForProtocolException(details,
"Transport not recognised:" + transport, null);
}
-
- return instance;
}
private static int getTransport(String transport)
@@ -180,13 +154,21 @@
{
int port = details.getPort();
- if (!_inVmPipeAddress.containsKey(port))
+ synchronized (_inVmPipeAddress)
{
- if (AutoCreate)
+ if (!_inVmPipeAddress.containsKey(port))
{
if (AutoCreate)
{
- createVMBroker(port);
+ if (AutoCreate)
+ {
+ createVMBroker(port);
+ }
+ else
+ {
+ throw new AMQVMBrokerCreationException(null, port, "VM
Broker on port " + port
+ + "
does not exist. Auto create disabled.", null);
+ }
}
else
{
@@ -194,11 +176,6 @@
+ "
does not exist. Auto create disabled.", null);
}
}
- else
- {
- throw new AMQVMBrokerCreationException(null, port, "VM Broker
on port " + port
- + " does
not exist. Auto create disabled.", null);
- }
}
return new VmPipeTransportConnection(port);
@@ -214,70 +191,73 @@
config.setThreadModel(ReadWriteThreadModel.getInstance());
}
-
- if (!_inVmPipeAddress.containsKey(port))
+ synchronized (_inVmPipeAddress)
{
- _logger.info("Creating InVM Qpid.AMQP listening on port " + port);
- IoHandlerAdapter provider = null;
- try
- {
- VmPipeAddress pipe = new VmPipeAddress(port);
-
- provider = createBrokerInstance(port);
-
- _acceptor.bind(pipe, provider);
- _inVmPipeAddress.put(port, pipe);
- _logger.info("Created InVM Qpid.AMQP listening on port " +
port);
- }
- catch (IOException e)
+ if (!_inVmPipeAddress.containsKey(port))
{
- _logger.error("Got IOException.", e);
-
- // Try and unbind provider
+ _logger.info("Creating InVM Qpid.AMQP listening on port " +
port);
+ IoHandlerAdapter provider = null;
try
{
VmPipeAddress pipe = new VmPipeAddress(port);
- try
- {
- _acceptor.unbind(pipe);
- }
- catch (Exception ignore)
- {
- // ignore
- }
-
- if (provider == null)
- {
- provider = createBrokerInstance(port);
- }
+ provider = createBrokerInstance(port);
_acceptor.bind(pipe, provider);
+
_inVmPipeAddress.put(port, pipe);
_logger.info("Created InVM Qpid.AMQP listening on port " +
port);
}
- catch (IOException justUseFirstException)
+ catch (IOException e)
{
- String because;
- if (e.getCause() == null)
+ _logger.error("Got IOException.", e);
+
+ // Try and unbind provider
+ try
{
- because = e.toString();
+ VmPipeAddress pipe = new VmPipeAddress(port);
+
+ try
+ {
+ _acceptor.unbind(pipe);
+ }
+ catch (Exception ignore)
+ {
+ // ignore
+ }
+
+ if (provider == null)
+ {
+ provider = createBrokerInstance(port);
+ }
+
+ _acceptor.bind(pipe, provider);
+ _inVmPipeAddress.put(port, pipe);
+ _logger.info("Created InVM Qpid.AMQP listening on port
" + port);
}
- else
+ catch (IOException justUseFirstException)
{
- because = e.getCause().toString();
- }
+ String because;
+ if (e.getCause() == null)
+ {
+ because = e.toString();
+ }
+ else
+ {
+ because = e.getCause().toString();
+ }
- throw new AMQVMBrokerCreationException(null, port, because
+ " Stopped binding of InVM Qpid.AMQP", e);
+ throw new AMQVMBrokerCreationException(null, port,
because + " Stopped binding of InVM Qpid.AMQP", e);
+ }
}
+
+ }
+ else
+ {
+ _logger.info("InVM Qpid.AMQP on port " + port + " already
exits.");
}
}
- else
- {
- _logger.info("InVM Qpid.AMQP on port " + port + " already exits.");
- }
-
}
private static IoHandlerAdapter createBrokerInstance(int port) throws
AMQVMBrokerCreationException
@@ -324,7 +304,7 @@
_logger.info("Killing all VM Brokers");
if (_acceptor != null)
{
- _acceptor.unbindAll();
+ _acceptor.unbindAll();
}
synchronized (_inVmPipeAddress)
{
@@ -337,14 +317,17 @@
public static void killVMBroker(int port)
{
- VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port);
- if (pipe != null)
+ synchronized (_inVmPipeAddress)
{
- _logger.info("Killing VM Broker:" + port);
- _inVmPipeAddress.remove(port);
- // This does need to be sychronized as otherwise mina can hang
- // if a new connection is made
- _acceptor.unbind(pipe);
+ VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port);
+ if (pipe != null)
+ {
+ _logger.info("Killing VM Broker:" + port);
+ _inVmPipeAddress.remove(port);
+ // This does need to be sychronized as otherwise mina can hang
+ // if a new connection is made
+ _acceptor.unbind(pipe);
+ }
}
}