Author: ritchiem
Date: Wed Nov 15 08:17:10 2006
New Revision: 475294
URL: http://svn.apache.org/viewvc?view=rev&rev=475294
Log:
QPID-92 Changes to bring MINA use up to MINA-Head (1.1.0) compatibility.
Sorry forgot this file.
Modified:
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/Main.java
Modified:
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/Main.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/Main.java?view=diff&rev=475294&r1=475293&r2=475294
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/Main.java
(original)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/Main.java
Wed Nov 15 08:17:10 2006
@@ -46,7 +46,6 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.SimpleByteBufferAllocator;
-import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import javax.management.JMException;
@@ -65,7 +64,6 @@
/**
* Main entry point for AMQPD.
- *
*/
public class Main implements ProtocolVersionList
{
@@ -122,10 +120,10 @@
Option bind =
OptionBuilder.withArgName("bind").hasArg().withDescription("bind to the
specified address. Overrides any value in the config file").
withLongOpt("bind").create("b");
Option logconfig =
OptionBuilder.withArgName("logconfig").hasArg().withDescription("use the
specified log4j xml configuration file. By " +
- "default looks for a file named " +
DEFAULT_LOG_CONFIG_FILENAME + " in the same directory as the configuration
file").
+
"default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME + "
in the same directory as the configuration file").
withLongOpt("logconfig").create("l");
Option logwatchconfig =
OptionBuilder.withArgName("logwatch").hasArg().withDescription("monitor the log
file configuration file for changes. Units are seconds. " +
- "Zero means do not check for
changes.").withLongOpt("logwatch").create("w");
+
"Zero means do not check for
changes.").withLongOpt("logwatch").create("w");
options.addOption(help);
options.addOption(version);
@@ -149,10 +147,12 @@
{
String ver = "Qpid 0.9.0.0";
String protocol = "AMQP version(s) [major.minor]: ";
- for (int i=0; i<pv.length; i++)
+ for (int i = 0; i < pv.length; i++)
{
if (i > 0)
+ {
protocol += ", ";
+ }
protocol += pv[i][PROTOCOL_MAJOR] + "." +
pv[i][PROTOCOL_MINOR];
}
System.out.println(ver + " (" + protocol + ")");
@@ -223,7 +223,8 @@
ConnectorConfiguration connectorConfig =
ApplicationRegistry.getInstance().
getConfiguredObject(ConnectorConfiguration.class);
- ByteBuffer.setUseDirectBuffers(connectorConfig.enableDirectBuffers);
+ // From old Mina
+ //ByteBuffer.setUseDirectBuffers(connectorConfig.enableDirectBuffers);
// the MINA default is currently to use the pooled allocator although
this may change in future
// once more testing of the performance of the simple allocator has
been done
@@ -258,12 +259,12 @@
int totalVHosts = ((Collection) virtualHosts).size();
for (int vhost = 0; vhost < totalVHosts; vhost++)
{
- setupVirtualHosts(configFile.getParent() ,
(String)((List)virtualHosts).get(vhost));
+ setupVirtualHosts(configFile.getParent(), (String) ((List)
virtualHosts).get(vhost));
}
}
else
{
- setupVirtualHosts(configFile.getParent() ,
(String)virtualHosts);
+ setupVirtualHosts(configFile.getParent(), (String)
virtualHosts);
}
}
bind(port, connectorConfig);
@@ -280,7 +281,7 @@
configFilePath = configFileParent +
configFilePath.substring(configVar.length());
}
- if (configFilePath.indexOf(".xml") != -1 )
+ if (configFilePath.indexOf(".xml") != -1)
{
VirtualHostConfiguration vHostConfig = new
VirtualHostConfiguration(configFilePath);
vHostConfig.performBindings();
@@ -293,11 +294,11 @@
String[] fileNames = virtualHostDir.list();
- for (int each=0; each < fileNames.length; each++)
+ for (int each = 0; each < fileNames.length; each++)
{
if (fileNames[each].endsWith(".xml"))
{
- VirtualHostConfiguration vHostConfig = new
VirtualHostConfiguration(configFilePath+"/"+fileNames[each]);
+ VirtualHostConfiguration vHostConfig = new
VirtualHostConfiguration(configFilePath + "/" + fileNames[each]);
vHostConfig.performBindings();
}
}
@@ -316,8 +317,22 @@
{
//IoAcceptor acceptor = new
SocketAcceptor(connectorConfig.processors);
IoAcceptor acceptor = connectorConfig.createAcceptor();
- SocketAcceptorConfig sconfig = (SocketAcceptorConfig)
acceptor.getDefaultConfig();
- SocketSessionConfig sc = (SocketSessionConfig)
sconfig.getSessionConfig();
+
+ SocketSessionConfig sc;
+
+ //fixme improve get to use configuration
+ if (connectorConfig.qpidNIO)
+ {
+ //FIXME - this needs to be sorted to use the new Mina
MultiThread SA.
+ _logger.warn("Using Qpid NIO - DISABLED");
+ //sconfig =
(org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptorConfig)
acceptor.getDefaultConfig();
+ //sc = (SocketSessionConfig) sconfig.getSessionConfig();
+ }
+// else
+ {
+ _logger.warn("Using Mina NIO");
+ sc = (SocketSessionConfig) acceptor.getSessionConfig();
+ }
sc.setReceiveBufferSize(connectorConfig.socketReceiveBufferSize);
sc.setSendBufferSize(connectorConfig.socketWriteBuferSize);
@@ -327,7 +342,7 @@
// implementation provided by MINA
if (connectorConfig.enableExecutorPool)
{
- sconfig.setThreadModel(new ReadWriteThreadModel());
+ acceptor.setThreadModel(new ReadWriteThreadModel());
}
if (connectorConfig.enableNonSSL)
@@ -342,7 +357,9 @@
{
bindAddress = new
InetSocketAddress(InetAddress.getByAddress(parseIP(bindAddr)), port);
}
- acceptor.bind(bindAddress, handler, sconfig);
+ acceptor.setLocalAddress(bindAddress);
+ acceptor.setHandler(handler);
+ acceptor.bind();
_logger.info("Qpid.AMQP listening on non-SSL address " +
bindAddress);
}
@@ -352,8 +369,9 @@
handler.setUseSSL(true);
try
{
- acceptor.bind(new
InetSocketAddress(connectorConfig.sslPort),
- handler, sconfig);
+ acceptor.setLocalAddress(new
InetSocketAddress(connectorConfig.sslPort));
+ acceptor.setHandler(handler);
+ acceptor.bind();
_logger.info("Qpid.AMQP listening on SSL port " +
connectorConfig.sslPort);
}
catch (IOException e)
@@ -408,15 +426,16 @@
catch (NumberFormatException e)
{
System.err.println("Log watch configuration value of " +
logWatchConfig + " is invalid. Must be " +
- "a non-negative integer. Using default of zero (no
watching configured");
+ "a non-negative integer. Using default of zero
(no watching configured");
}
if (logConfigFile.exists() && logConfigFile.canRead())
{
System.out.println("Configuring logger using configuration file "
+ logConfigFile.getAbsolutePath());
+
if (logWatchTime > 0)
{
System.out.println("log file " +
logConfigFile.getAbsolutePath() + " will be checked for changes every " +
- logWatchTime + " seconds");
+ logWatchTime + " seconds");
// log4j expects the watch interval in milliseconds
DOMConfigurator.configureAndWatch(logConfigFile.getAbsolutePath(), logWatchTime
* 1000);
}
@@ -441,7 +460,7 @@
}
catch (NotCompliantMBeanException ex)
{
- throw new AMQException("Exception occured in creating
AMQBrokerManager MBean.");
+ throw new AMQException("Exception occured in creating
AMQBrokerManager MBean.");
}
}
@@ -451,24 +470,24 @@
*/
@MBeanDescription("This MBean exposes the broker level management
features")
private final class AMQBrokerManager extends AMQManagedObject
- implements ManagedBroker
+ implements ManagedBroker
{
- private final QueueRegistry _queueRegistry;
+ private final QueueRegistry _queueRegistry;
private final ExchangeRegistry _exchangeRegistry;
- private final ExchangeFactory _exchangeFactory;
- private final MessageStore _messageStore;
+ private final ExchangeFactory _exchangeFactory;
+ private final MessageStore _messageStore;
@MBeanConstructor("Creates the Broker Manager MBean")
- protected AMQBrokerManager() throws NotCompliantMBeanException
+ protected AMQBrokerManager() throws NotCompliantMBeanException
{
super(ManagedBroker.class, ManagedBroker.TYPE);
IApplicationRegistry appRegistry =
ApplicationRegistry.getInstance();
- _queueRegistry = appRegistry.getQueueRegistry();
+ _queueRegistry = appRegistry.getQueueRegistry();
_exchangeRegistry = appRegistry.getExchangeRegistry();
- _exchangeFactory =
ApplicationRegistry.getInstance().getExchangeFactory();
- _messageStore =
ApplicationRegistry.getInstance().getMessageStore();
- }
+ _exchangeFactory =
ApplicationRegistry.getInstance().getExchangeFactory();
+ _messageStore =
ApplicationRegistry.getInstance().getMessageStore();
+ }
public String getObjectInstanceName()
{
@@ -477,6 +496,7 @@
/**
* Creates new exchange and registers it with the registry.
+ *
* @param exchangeName
* @param type
* @param durable
@@ -487,7 +507,7 @@
String type,
boolean durable,
boolean autoDelete)
- throws JMException
+ throws JMException
{
try
{
@@ -498,10 +518,10 @@
if (exchange == null)
{
exchange =
_exchangeFactory.createExchange(exchangeName,
- type,
//eg direct
- durable,
- autoDelete,
- 0);
//ticket no
+ type,
//eg direct
+ durable,
+ autoDelete,
+ 0);
//ticket no
_exchangeRegistry.registerExchange(exchange);
}
else
@@ -510,7 +530,7 @@
}
}
}
- catch(AMQException ex)
+ catch (AMQException ex)
{
_logger.error("Error in creating exchange " + exchangeName,
ex);
throw new MBeanException(ex, ex.toString());
@@ -519,11 +539,12 @@
/**
* Unregisters the exchange from registry.
+ *
* @param exchangeName
* @throws JMException
*/
public void unregisterExchange(String exchangeName)
- throws JMException
+ throws JMException
{
boolean inUse = false;
// TODO
@@ -534,7 +555,7 @@
{
_exchangeRegistry.unregisterExchange(exchangeName, false);
}
- catch(AMQException ex)
+ catch (AMQException ex)
{
_logger.error("Error in unregistering exchange " +
exchangeName, ex);
throw new MBeanException(ex, ex.toString());
@@ -544,6 +565,7 @@
/**
* Creates a new queue and registers it with the registry and puts it
* in persistance storage if durable queue.
+ *
* @param queueName
* @param durable
* @param owner
@@ -554,7 +576,7 @@
boolean durable,
String owner,
boolean autoDelete)
- throws JMException
+ throws JMException
{
AMQQueue queue = _queueRegistry.getQueue(queueName);
if (queue == null)
@@ -582,6 +604,7 @@
/**
* Deletes the queue from queue registry and persistant storage.
+ *
* @param queueName
* @throws JMException
*/