Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java?view=auto&rev=522988 ============================================================================== --- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java (added) +++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java Tue Mar 27 09:31:05 2007 @@ -0,0 +1,412 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.transport; + +import org.apache.qpid.url.URLHelper; +import org.apache.qpid.url.URLSyntaxException; + +import java.util.*; +import java.net.InetAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; + +public class AMQPConnectionURL implements ConnectionURL +{ + private String _url; + private String _failoverMethod; + private HashMap<String, String> _failoverOptions; + private HashMap<String, String> _options; + private List<BrokerDetails> _brokers; + private String _clientName; + private String _username; + private String _password; + private String _virtualHost; + + public AMQPConnectionURL(String fullURL) throws URLSyntaxException + { + _url = fullURL; + _options = new HashMap<String, String>(); + _brokers = new LinkedList<BrokerDetails>(); + _failoverOptions = new HashMap<String, String>(); + + try + { + URI connection = new URI(fullURL); + + if (connection.getScheme() == null || !(connection.getScheme().equalsIgnoreCase(AMQ_PROTOCOL))) + { + throw new URISyntaxException(fullURL, "Not an AMQP URL"); + } + + if (connection.getHost() == null || connection.getHost().equals("")) + { + String uid = getUniqueClientID(); + if (uid == null) + { + URLHelper.parseError(-1, "Client Name not specified", fullURL); + } + else + { + setClientName(uid); + } + + } + else + { + setClientName(connection.getHost()); + } + + String userInfo = connection.getUserInfo(); + + if (userInfo == null) + { + //Fix for Java 1.5 which doesn't parse UserInfo for non http URIs + userInfo = connection.getAuthority(); + + if (userInfo != null) + { + int atIndex = userInfo.indexOf('@'); + + if (atIndex != -1) + { + userInfo = userInfo.substring(0, atIndex); + } + else + { + userInfo = null; + } + } + + } + + if (userInfo == null) + { + URLHelper.parseError(AMQ_PROTOCOL.length() + 3, + "User information not found on url", fullURL); + } + else + { + parseUserInfo(userInfo); + } + String virtualHost = connection.getPath(); + + if (virtualHost != null && (!virtualHost.equals(""))) + { + setVirtualHost(virtualHost); + } + else + { + int authLength = connection.getAuthority().length(); + int start = AMQ_PROTOCOL.length() + 3; + int testIndex = start + authLength; + if (testIndex < fullURL.length() && fullURL.charAt(testIndex) == '?') + { + URLHelper.parseError(start, testIndex - start, "Virtual host found", fullURL); + } + else + { + URLHelper.parseError(-1, "Virtual host not specified", fullURL); + } + + } + + + URLHelper.parseOptions(_options, connection.getQuery()); + + processOptions(); + + //Fragment is #string (not used) + //System.out.println(connection.getFragment()); + + } + catch (URISyntaxException uris) + { + if (uris instanceof URLSyntaxException) + { + throw (URLSyntaxException) uris; + } + + int slash = fullURL.indexOf("\\"); + + if (slash == -1) + { + URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput()); + } + else + { + if (slash != 0 && fullURL.charAt(slash - 1) == ':') + { + URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2, "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL); + } + else + { + URLHelper.parseError(slash, "Forward slash not allowed in URL", fullURL); + } + } + + } + } + + private String getUniqueClientID() + { + try + { + InetAddress addr = InetAddress.getLocalHost(); + return addr.getHostName() + System.currentTimeMillis(); + } + catch (UnknownHostException e) + { + return null; + } + } + + private void parseUserInfo(String userinfo) throws URLSyntaxException + { + //user info = user:pass + + int colonIndex = userinfo.indexOf(':'); + + if (colonIndex == -1) + { + URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(), + "Null password in user information not allowed.", _url); + } + else + { + setUsername(userinfo.substring(0, colonIndex)); + setPassword(userinfo.substring(colonIndex + 1)); + } + + } + + private void processOptions() throws URLSyntaxException + { + if (_options.containsKey(OPTIONS_BROKERLIST)) + { + String brokerlist = _options.get(OPTIONS_BROKERLIST); + + //brokerlist tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value' + StringTokenizer st = new StringTokenizer(brokerlist, "" + URLHelper.BROKER_SEPARATOR); + + while (st.hasMoreTokens()) + { + String broker = st.nextToken(); + + _brokers.add(new AMQPBrokerDetails(broker)); + } + + _options.remove(OPTIONS_BROKERLIST); + } + + if (_options.containsKey(OPTIONS_FAILOVER)) + { + String failover = _options.get(OPTIONS_FAILOVER); + + // failover='method?option='value',option='value'' + + int methodIndex = failover.indexOf('?'); + + if (methodIndex > -1) + { + _failoverMethod = failover.substring(0, methodIndex); + URLHelper.parseOptions(_failoverOptions, failover.substring(methodIndex + 1)); + } + else + { + _failoverMethod = failover; + } + + _options.remove(OPTIONS_FAILOVER); + } + } + + public String getURL() + { + return _url; + } + + public String getFailoverMethod() + { + return _failoverMethod; + } + + public String getFailoverOption(String key) + { + return _failoverOptions.get(key); + } + + public void setFailoverOption(String key, String value) + { + _failoverOptions.put(key, value); + } + + public int getBrokerCount() + { + return _brokers.size(); + } + + public BrokerDetails getBrokerDetails(int index) + { + if (index < _brokers.size()) + { + return _brokers.get(index); + } + else + { + return null; + } + } + + public void addBrokerDetails(BrokerDetails broker) + { + if (!(_brokers.contains(broker))) + { + _brokers.add(broker); + } + } + + public List<BrokerDetails> getAllBrokerDetails() + { + return _brokers; + } + + public String getClientName() + { + return _clientName; + } + + public void setClientName(String clientName) + { + _clientName = clientName; + } + + public String getUsername() + { + return _username; + } + + public void setUsername(String username) + { + _username = username; + } + + public String getPassword() + { + return _password; + } + + public void setPassword(String password) + { + _password = password; + } + + public String getVirtualHost() + { + return _virtualHost; + } + + public void setVirtualHost(String virtuaHost) + { + _virtualHost = virtuaHost; + } + + public String getOption(String key) + { + return _options.get(key); + } + + public void setOption(String key, String value) + { + _options.put(key, value); + } + + public String toString() + { + StringBuffer sb = new StringBuffer(); + + sb.append(AMQ_PROTOCOL); + sb.append("://"); + + if (_username != null) + { + sb.append(_username); + + if (_password != null) + { + sb.append(':'); + sb.append(_password); + } + + sb.append('@'); + } + + sb.append(_clientName); + + sb.append(_virtualHost); + + sb.append(optionsToString()); + + return sb.toString(); + } + + private String optionsToString() + { + StringBuffer sb = new StringBuffer(); + + sb.append("?" + OPTIONS_BROKERLIST + "='"); + + for (BrokerDetails service : _brokers) + { + sb.append(service.toString()); + sb.append(';'); + } + + sb.deleteCharAt(sb.length() - 1); + sb.append("'"); + + if (_failoverMethod != null) + { + sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + sb.append(OPTIONS_FAILOVER + "='"); + sb.append(_failoverMethod); + sb.append(URLHelper.printOptions(_failoverOptions)); + sb.append("'"); + } + + return sb.toString(); + } + + + public static void main(String[] args) throws URLSyntaxException + { + + String url2 = "amqp://ritchiem:[EMAIL PROTECTED]'tcp://localhost:5672;jcp://fancyserver:3000/',failover='roundrobin'"; + //"amqp://user:[EMAIL PROTECTED]/virtualhost?brokerlist='tcp://host:1?option1=\'value\',option2=\'value\';vm://:3?option1=\'value\'',failover='method?option1=\'value\',option2='value''"; + + //ConnectionURL connectionurl2 = new AMQConnectionURL(url2); + + System.out.println(url2); + //System.out.println(connectionurl2); + + } + +}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java?view=auto&rev=522988 ============================================================================== --- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java (added) +++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java Tue Mar 27 09:31:05 2007 @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.transport; + +public interface BrokerDetails +{ + + /* + * Known URL Options + * @see ConnectionURL + */ + public static final String OPTIONS_RETRY = "retries"; + public static final String OPTIONS_SSL = ConnectionURL.OPTIONS_SSL; + public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout"; + public static final int DEFAULT_PORT = 5672; + + public static final String TCP = "tcp"; + public static final String VM = "vm"; + + public static final String DEFAULT_TRANSPORT = TCP; + + public static final String URL_FORMAT_EXAMPLE = + "<transport>://<hostname>[:<port Default=\"" + DEFAULT_PORT + "\">][?<option>='<value>'[,<option>='<value>']]"; + + public static final long DEFAULT_CONNECT_TIMEOUT = 30000L; + public static final boolean USE_SSL_DEFAULT = false; + + String getHost(); + + void setHost(String host); + + int getPort(); + + void setPort(int port); + + String getTransport(); + + void setTransport(String transport); + + boolean useSSL(); + + void useSSL(boolean ssl); + + String getOption(String key); + + void setOption(String key, String value); + + long getTimeout(); + + void setTimeout(long timeout); + + String toString(); + + boolean equals(Object o); +} Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java?view=auto&rev=522988 ============================================================================== --- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java (added) +++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java Tue Mar 27 09:31:05 2007 @@ -0,0 +1,77 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.transport; + +import java.util.List; + +/** + Connection URL format + For TCP: + amqp://[user:[EMAIL PROTECTED]/virtualhost?brokerlist='tcp://host:port?option=\'value\'&option=\'value\' + + For VMBroker: + vm://:3/virtualpath?option=\'value\''&failover='method?option=\'value\'&option='value''" + + Options are of course optional except for requiring a single broker in the broker list. + The option seperator is defined to be either '&' or ',' + */ +public interface ConnectionURL +{ + public static final String AMQ_PROTOCOL = "amqp"; + public static final String OPTIONS_BROKERLIST = "brokerlist"; + public static final String OPTIONS_FAILOVER = "failover"; + public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount"; + public static final String OPTIONS_SSL = "ssl"; + + String getURL(); + + String getFailoverMethod(); + + String getFailoverOption(String key); + + int getBrokerCount(); + + BrokerDetails getBrokerDetails(int index); + + void addBrokerDetails(BrokerDetails broker); + + List<BrokerDetails> getAllBrokerDetails(); + + String getClientName(); + + void setClientName(String clientName); + + String getUsername(); + + void setUsername(String username); + + String getPassword(); + + void setPassword(String password); + + String getVirtualHost(); + + void setVirtualHost(String virtualHost); + + String getOption(String key); + + void setOption(String key, String value); +} Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java?view=auto&rev=522988 ============================================================================== --- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java (added) +++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java Tue Mar 27 09:31:05 2007 @@ -0,0 +1,82 @@ +package org.apache.qpid.nclient.transport; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.SimpleByteBufferAllocator; +import org.apache.mina.transport.socket.nio.SocketConnector; +import org.apache.mina.transport.socket.nio.SocketConnectorConfig; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; +import org.apache.qpid.nclient.config.ClientConfiguration; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.DefaultPhaseContext; +import org.apache.qpid.nclient.core.Phase; +import org.apache.qpid.nclient.core.PhaseContext; +import org.apache.qpid.nclient.core.PhaseFactory; +import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.pool.ReadWriteThreadModel; + +public class TCPConnection implements TransportConnection +{ + private static final Logger _logger = Logger.getLogger(TCPConnection.class); + private BrokerDetails _brokerDetails; + private IoConnector _ioConnector; + private Phase _phase; + + protected TCPConnection(ConnectionURL url) + { + _brokerDetails = url.getBrokerDetails(0); + + ByteBuffer.setUseDirectBuffers(ClientConfiguration.get().getBoolean(QpidConstants.ENABLE_DIRECT_BUFFERS)); + + // the MINA default is currently to use the pooled allocator although this may change in future + // once more testing of the performance of the simple allocator has been done + if (ClientConfiguration.get().getBoolean(QpidConstants.ENABLE_POOLED_ALLOCATOR)) + { + // Not sure what the original code wanted use :) + } + else + { + _logger.info("Using SimpleByteBufferAllocator"); + ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); + } + + final IoConnector ioConnector = new SocketConnector(); + SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig(); + + // if we do not use our own thread model we get the MINA default which is to use + // its own leader-follower model + if (ClientConfiguration.get().getBoolean(QpidConstants.USE_SHARED_READ_WRITE_POOL)) + { + cfg.setThreadModel(ReadWriteThreadModel.getInstance()); + } + + SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); + scfg.setTcpNoDelay(ClientConfiguration.get().getBoolean(QpidConstants.TCP_NO_DELAY)); + scfg.setSendBufferSize(ClientConfiguration.get().getInt(QpidConstants.SEND_BUFFER_SIZE_IN_KB)*1024); + scfg.setReceiveBufferSize(ClientConfiguration.get().getInt(QpidConstants.RECEIVE_BUFFER_SIZE_IN_KB)*1024); + } + + // Returns the phase pipe + public Phase connect() throws AMQPException + { + PhaseContext ctx = new DefaultPhaseContext(); + ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails); + ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector); + + _phase = PhaseFactory.createPhasePipe(ctx); + _phase.start(); + + return _phase; + } + + public void close() throws AMQPException + { + + } + + public Phase getPhasePipe() + { + return _phase; + } +} Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java?view=auto&rev=522988 ============================================================================== --- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java (added) +++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java Tue Mar 27 09:31:05 2007 @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.transport; + + +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.Phase; + +public interface TransportConnection +{ + public Phase connect()throws AMQPException; + + public void close()throws AMQPException; + + public Phase getPhasePipe(); +} \ No newline at end of file Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java?view=auto&rev=522988 ============================================================================== --- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java (added) +++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java Tue Mar 27 09:31:05 2007 @@ -0,0 +1,44 @@ +package org.apache.qpid.nclient.transport; + +import java.net.URISyntaxException; + +public class TransportConnectionFactory +{ + public enum ConnectionType + { + TCP,VM + } + + public static TransportConnection createTransportConnection(String url,ConnectionType type) throws URISyntaxException + { + return createTransportConnection(new AMQPConnectionURL(url),type); + + } + + public static TransportConnection createTransportConnection(ConnectionURL url,ConnectionType type) + { + switch (type) + { + case TCP : default: + { + return createTCPConnection(url); + } + + case VM : + { + return createVMConnection(url); + } + } + + } + + private static TransportConnection createTCPConnection(ConnectionURL url) + { + return new TCPConnection(url); + } + + private static TransportConnection createVMConnection(ConnectionURL url) + { + return null; + } +} Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java?view=auto&rev=522988 ============================================================================== --- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java (added) +++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java Tue Mar 27 09:31:05 2007 @@ -0,0 +1,263 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.transport; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.WriteFuture; +import org.apache.mina.filter.SSLFilter; +import org.apache.mina.filter.codec.ProtocolCodecFilter; +import org.apache.mina.transport.vmpipe.VmPipeAddress; +import org.apache.mina.transport.vmpipe.VmPipeConnector; +import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.framing.ProtocolVersionList; +import org.apache.qpid.nclient.config.ClientConfiguration; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.AbstractPhase; +import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.ssl.BogusSSLContextFactory; + +/** + * The Transport Phase corresponds to the Layer 1 in AMQP It works at the Frame + * layer + * + */ +public class TransportPhase extends AbstractPhase implements IoHandler, ProtocolVersionList +{ + + private static final Logger _logger = Logger + .getLogger(TransportPhase.class); + + private IoSession _ioSession; + private BrokerDetails _brokerDetails; + + protected WriteFuture _lastWriteFuture; + + /** + * ------------------------------------------------ + * Phase - methods introduced by Phase + * ------------------------------------------------ + */ + + public void start()throws AMQPException + { + _brokerDetails = (BrokerDetails)_ctx.getProperty(QpidConstants.AMQP_BROKER_DETAILS); + IoConnector ioConnector = (IoConnector)_ctx.getProperty(QpidConstants.MINA_IO_CONNECTOR); + + final SocketAddress address; + if (ioConnector instanceof VmPipeConnector) + { + address = new VmPipeAddress(_brokerDetails.getPort()); + } + else + { + address = new InetSocketAddress(_brokerDetails.getHost(), _brokerDetails.getPort()); + _logger.info("Attempting connection to " + address); + + } + + ConnectFuture future = ioConnector.connect(address,this); + + // wait for connection to complete + if (future.join(_brokerDetails.getTimeout())) + { + // we call getSession which throws an IOException if there has been an error connecting + future.getSession(); + } + else + { + throw new AMQPException("Timeout waiting for connection."); + } + } + + public void messageReceived(Object frame) throws AMQPException + { + super.messageReceived(frame); + } + + public void messageSent(Object frame) throws AMQPException + { + + _ioSession.write(frame); + } + + /** + * ------------------------------------------------ + * IoHandler - methods introduced by IoHandler + * ------------------------------------------------ + */ + public void sessionIdle(IoSession session, IdleStatus status) + throws Exception + { + _logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + + status); + if (IdleStatus.WRITER_IDLE.equals(status)) + { + // write heartbeat frame: + _logger.debug("Sent heartbeat"); + session.write(HeartbeatBody.FRAME); + // HeartbeatDiagnostics.sent(); + } else if (IdleStatus.READER_IDLE.equals(status)) + { + // failover: + // HeartbeatDiagnostics.timeout(); + _logger.warn("Timed out while waiting for heartbeat from peer."); + session.close(); + } + } + + public void messageReceived(IoSession session, Object message) + throws Exception + { + AMQFrame frame = (AMQFrame) message; + final AMQBody bodyFrame = frame.getBodyFrame(); + + if (bodyFrame instanceof HeartbeatBody) + { + _logger.debug("Received heartbeat"); + } else + { + messageReceived(bodyFrame); + } + // _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes()); + } + + public void messageSent(IoSession session, Object message) throws Exception + { + _logger.debug("Sent frame " + message); + } + + public void exceptionCaught(IoSession session, Throwable cause) + throws Exception + { + // Need to handle failover + sessionClosed(session); + } + + public void sessionClosed(IoSession session) throws Exception + { + // Need to handle failover + _logger.info("Protocol Session [" + this + "] closed"); + } + + public void sessionCreated(IoSession session) throws Exception + { + _logger.debug("Protocol session created for session " + + System.identityHashCode(session)); + + final ProtocolCodecFilter pcf = new ProtocolCodecFilter( + new AMQCodecFactory(false)); + + if (ClientConfiguration.get().getBoolean( + QpidConstants.USE_SHARED_READ_WRITE_POOL)) + { + session.getFilterChain().addBefore("AsynchronousWriteFilter", + "protocolFilter", pcf); + } else + { + session.getFilterChain().addLast("protocolFilter", pcf); + } + // we only add the SSL filter where we have an SSL connection + if (_brokerDetails.useSSL()) + { + // FIXME: Bogus context cannot be used in production. + SSLFilter sslFilter = new SSLFilter(BogusSSLContextFactory + .getInstance(false)); + sslFilter.setUseClientMode(true); + session.getFilterChain().addBefore("protocolFilter", "ssl", + sslFilter); + } + + try + { + + ReadWriteThreadModel threadModel = ReadWriteThreadModel + .getInstance(); + threadModel.getAsynchronousReadFilter().createNewJobForSession( + session); + threadModel.getAsynchronousWriteFilter().createNewJobForSession( + session); + } catch (RuntimeException e) + { + e.printStackTrace(); + } + + doAMQPConnectionNegotiation(); + } + + public void sessionOpened(IoSession session) throws Exception + { + _logger.debug("Protocol session opened for session " + + System.identityHashCode(session)); + } + + /** + * ---------------------------------------------------------- + * Protocol related methods + * ---------------------------------------------------------- + */ + private void doAMQPConnectionNegotiation() + { + int i = pv.length - 1; + writeFrame(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR])); + } + + /** + * ---------------------------------------------------------- + * Write Operations + * ---------------------------------------------------------- + */ + public void writeFrame(AMQDataBlock frame) + { + writeFrame(frame, false); + } + + public void writeFrame(AMQDataBlock frame, boolean wait) + { + WriteFuture f = _ioSession.write(frame); + if (wait) + { + // fixme -- time out? + f.join(); + } else + { + _lastWriteFuture = f; + } + } + + /** + * ----------------------------------------------------------- Failover + * section ----------------------------------------------------------- + */ +} Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java?view=auto&rev=522988 ============================================================================== --- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java (added) +++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java Tue Mar 27 09:31:05 2007 @@ -0,0 +1,133 @@ +package org.apache.qpid.nclient.transport; + +import java.io.IOException; + +import org.apache.log4j.Logger; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoHandlerAdapter; +import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.transport.vmpipe.VmPipeAcceptor; +import org.apache.mina.transport.vmpipe.VmPipeAddress; +import org.apache.mina.transport.vmpipe.VmPipeConnector; +import org.apache.qpid.nclient.config.ClientConfiguration; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.DefaultPhaseContext; +import org.apache.qpid.nclient.core.Phase; +import org.apache.qpid.nclient.core.PhaseContext; +import org.apache.qpid.nclient.core.PhaseFactory; +import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.pool.PoolingFilter; +import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.pool.ReferenceCountingExecutorService; + +public class VMConnection implements TransportConnection +{ + private static final Logger _logger = Logger.getLogger(VMConnection.class); + private BrokerDetails _brokerDetails; + private IoConnector _ioConnector; + private Phase _phase; + + protected VMConnection(ConnectionURL url) + { + _brokerDetails = url.getBrokerDetails(0); + _ioConnector = new VmPipeConnector(); + final IoServiceConfig cfg = _ioConnector.getDefaultConfig(); + ReferenceCountingExecutorService executorService = ReferenceCountingExecutorService.getInstance(); + PoolingFilter asyncRead = PoolingFilter.createAynschReadPoolingFilter(executorService, + "AsynchronousReadFilter"); + cfg.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead); + PoolingFilter asyncWrite = PoolingFilter.createAynschWritePoolingFilter(executorService, + "AsynchronousWriteFilter"); + cfg.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite); + } + + public Phase connect() throws AMQPException + { + createVMBroker(); + + PhaseContext ctx = new DefaultPhaseContext(); + ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails); + ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector); + + _phase = PhaseFactory.createPhasePipe(ctx); + _phase.start(); + + return _phase; + + } + + private void createVMBroker()throws AMQPException + { + _logger.info("Creating InVM Qpid.AMQP listening on port " + _brokerDetails.getPort()); + + VmPipeAcceptor acceptor = new VmPipeAcceptor(); + IoServiceConfig config = acceptor.getDefaultConfig(); + config.setThreadModel(ReadWriteThreadModel.getInstance()); + + IoHandlerAdapter provider = null; + try + { + VmPipeAddress pipe = new VmPipeAddress(_brokerDetails.getPort()); + provider = createBrokerInstance(_brokerDetails.getPort()); + acceptor.bind(pipe, provider); + _logger.info("Created InVM Qpid.AMQP listening on port " + _brokerDetails.getPort()); + } + catch (IOException e) + { + _logger.error(e); + VmPipeAddress pipe = new VmPipeAddress(_brokerDetails.getPort()); + acceptor.unbind(pipe); + + throw new AMQPException("Error creating VM broker",e); + } + } + + private IoHandlerAdapter createBrokerInstance(int port) throws AMQPException + { + String protocolProviderClass = ClientConfiguration.get().getString(QpidConstants.QPID_VM_BROKER_CLASS); + _logger.info("Creating Qpid protocol provider: " + protocolProviderClass); + + // can't use introspection to get Provider as it is a server class. + // need to go straight to IoHandlerAdapter but that requries the queues and exchange from the ApplicationRegistry which we can't access. + + //get correct constructor and pass in instancec ID - "port" + IoHandlerAdapter provider; + try + { + Class[] cnstr = {Integer.class}; + Object[] params = {port}; + provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params); + //Give the broker a second to create + _logger.info("Created VMBroker Instance:" + port); + } + catch (Exception e) + { + _logger.info("Unable to create InVM Qpid broker on port " + port + ". due to : " + e.getCause()); + _logger.error(e); + String because; + if (e.getCause() == null) + { + because = e.toString(); + } + else + { + because = e.getCause().toString(); + } + + + throw new AMQPException(port, because + " Stopped InVM Qpid.AMQP creation",e); + } + + return provider; + } + + public void close() throws AMQPException + { + + } + + public Phase getPhasePipe() + { + return _phase; + } +} Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java?view=auto&rev=522988 ============================================================================== --- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java (added) +++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java Tue Mar 27 09:31:05 2007 @@ -0,0 +1,14 @@ +package org.apache.qpid.nclient.util; + +import org.apache.qpid.nclient.core.AMQPException; + +public class AMQPValidator +{ + public static void throwExceptionOnNull(Object obj, String msg) throws AMQPException + { + if(obj == null) + { + throw new AMQPException(msg); + } + } +}
