Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java?view=diff&rev=524144&r1=524143&r2=524144 ============================================================================== --- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java (original) +++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java Fri Mar 30 08:53:18 2007 @@ -65,6 +65,7 @@ import org.apache.qpid.nclient.amqp.AMQPExchange; import org.apache.qpid.nclient.amqp.AMQPMessage; import org.apache.qpid.nclient.amqp.AMQPQueue; +import org.apache.qpid.nclient.amqp.state.AMQPStateType; import org.apache.qpid.nclient.transport.AMQPConnectionURL; import org.apache.qpid.nclient.transport.ConnectionURL; import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType; @@ -80,371 +81,371 @@ @SuppressWarnings("unused") public class TestClient { - private byte _major; - private byte _minor; - private ConnectionURL _url; - private static int _channel = 2; - // Need a Class factory per connection - private AMQPClassFactory _classFactory = new AMQPClassFactory(); - private int _ticket; - - public AMQPConnection openConnection() throws Exception - { - //_url = new AMQPConnectionURL("amqp://guest:[EMAIL PROTECTED]/localhost?brokerlist='vm://:3'"); - - _url = new AMQPConnectionURL("amqp://guest:[EMAIL PROTECTED]/test?brokerlist='tcp://localhost:5672?'"); - return _classFactory.createConnectionClass(_url,ConnectionType.TCP); - } - - public void handleConnectionNegotiation(AMQPConnection con) throws Exception - { - // ConnectionStartBody - ConnectionStartBody connectionStartBody = con.openTCPConnection(); - _major = connectionStartBody.getMajor(); - _minor = connectionStartBody.getMinor(); - - FieldTable clientProperties = FieldTableFactory.newFieldTable(); - clientProperties.put(new AMQShortString(ClientProperties.instance.toString()),"Test"); // setting only the client id - - final String locales = new String(connectionStartBody.getLocales(), "utf8"); - final StringTokenizer tokenizer = new StringTokenizer(locales, " "); - - final String mechanism = SecurityHelper.chooseMechanism(connectionStartBody.getMechanisms()); - - SaslClient sc = Sasl.createSaslClient(new String[]{mechanism}, - null, "AMQP", "localhost", - null, SecurityHelper.createCallbackHandler(mechanism,_url)); - - ConnectionStartOkBody connectionStartOkBody = - ConnectionStartOkBody.createMethodBody(_major, _minor, clientProperties, - new AMQShortString(tokenizer.nextToken()), - new AMQShortString(mechanism), - (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null)); - // ConnectionSecureBody - AMQMethodBody body = con.startOk(connectionStartOkBody); - ConnectionTuneBody connectionTuneBody; - - if (body instanceof ConnectionSecureBody) - { - ConnectionSecureBody connectionSecureBody = (ConnectionSecureBody)body; - ConnectionSecureOkBody connectionSecureOkBody = ConnectionSecureOkBody.createMethodBody( - _major,_minor,sc.evaluateChallenge(connectionSecureBody.getChallenge())); - //Assuming the server is not going to send another challenge - connectionTuneBody = (ConnectionTuneBody)con.secureOk(connectionSecureOkBody); - - } - else - { - connectionTuneBody = (ConnectionTuneBody)body; - } - - - // Using broker supplied values - ConnectionTuneOkBody connectionTuneOkBody = - ConnectionTuneOkBody.createMethodBody(_major,_minor, - connectionTuneBody.getChannelMax(), - connectionTuneBody.getFrameMax(), - connectionTuneBody.getHeartbeat()); - con.tuneOk(connectionTuneOkBody); - - ConnectionOpenBody connectionOpenBody = - ConnectionOpenBody.createMethodBody(_major,_minor,null, true,new AMQShortString(_url.getVirtualHost())); - - ConnectionOpenOkBody connectionOpenOkBody = con.open(connectionOpenBody); - } - - public void handleChannelNegotiation() throws Exception - { - AMQPChannel channel = _classFactory.createChannelClass(_channel); - - ChannelOpenBody channelOpenBody = ChannelOpenBody.createMethodBody(_major, _minor, new AMQShortString("myChannel1")); - ChannelOpenOkBody channelOpenOkBody = channel.open(channelOpenBody); - - //lets have some fun - ChannelFlowBody channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, false); - - ChannelFlowOkBody channelFlowOkBody = channel.flow(channelFlowBody); - System.out.println("Channel is " + (channelFlowOkBody.getActive()? "active" : "suspend")); - - channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, true); - channelFlowOkBody = channel.flow(channelFlowBody); - System.out.println("Channel is " + (channelFlowOkBody.getActive()? "active" : "suspend")); - } - - public void createExchange() throws Exception - { - AMQPExchange exchange = _classFactory.createExchangeClass(_channel); - - ExchangeDeclareBody exchangeDeclareBody = - ExchangeDeclareBody.createMethodBody(_major, _minor, - null, // arguments - false,//auto delete - false,// durable - new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME), - true, //internal - false,// nowait - false,// passive - _ticket, - new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)); - - AMQPCallBack cb = createCallBackWithMessage("Broker has created the exchange"); - exchange.declare(exchangeDeclareBody, cb); - // Blocking for response - while (!cb.isComplete()){} - } - - - public void createAndBindQueue()throws Exception - { - AMQPQueue queue = _classFactory.createQueueClass(_channel); - - QueueDeclareBody queueDeclareBody = - QueueDeclareBody.createMethodBody(_major, _minor, - null, //arguments - false,//auto delete - false,// durable - false, //exclusive, - false, //nowait, - false, //passive, - new AMQShortString("MyTestQueue"), - 0); - - AMQPCallBack cb = new AMQPCallBack(){ - - @Override - public void brokerResponded(AMQMethodBody body) - { - QueueDeclareOkBody queueDeclareOkBody = (QueueDeclareOkBody)body; - System.out.println("[Broker has created the queue, " + - "message count " + queueDeclareOkBody.getMessageCount() + - "consumer count " + queueDeclareOkBody.getConsumerCount() + "]\n"); - } - - @Override - public void brokerRespondedWithError(AMQException e) - { - } - - }; - - queue.declare(queueDeclareBody, cb); - //Blocking for response - while (!cb.isComplete()){} - - QueueBindBody queueBindBody = - QueueBindBody.createMethodBody(_major, _minor, - null, //arguments - new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),//exchange - false, //nowait - new AMQShortString("MyTestQueue"), //queue - new AMQShortString("RH"), //routingKey - 0 //ticket - ); - - cb = createCallBackWithMessage("Broker has bound the queue"); - queue.bind(queueBindBody, cb); - //Blocking for response - while (!cb.isComplete()){} - } - - public void purgeQueue()throws Exception - { - AMQPQueue queue = _classFactory.createQueueClass(_channel); - - QueuePurgeBody queuePurgeBody = - QueuePurgeBody.createMethodBody(_major, _minor, - false, //nowait - new AMQShortString("MyTestQueue"), //queue - 0 //ticket - ); - - AMQPCallBack cb = new AMQPCallBack(){ - - @Override - public void brokerResponded(AMQMethodBody body) - { - QueuePurgeOkBody queuePurgeOkBody = (QueuePurgeOkBody)body; - System.out.println("[Broker has purged the queue, message count " + queuePurgeOkBody.getMessageCount() + "]\n"); - } - - @Override - public void brokerRespondedWithError(AMQException e) - { - } - - }; - - queue.purge(queuePurgeBody, cb); - //Blocking for response - while (!cb.isComplete()){} - - } - - public void deleteQueue()throws Exception - { - AMQPQueue queue = _classFactory.createQueueClass(_channel); - - QueueDeleteBody queueDeleteBody = - QueueDeleteBody.createMethodBody(_major, _minor, - false, //ifEmpty - false, //ifUnused - false, //nowait - new AMQShortString("MyTestQueue"), //queue - 0 //ticket - ); - - AMQPCallBack cb = new AMQPCallBack(){ - - @Override - public void brokerResponded(AMQMethodBody body) - { - QueueDeleteOkBody queueDeleteOkBody = (QueueDeleteOkBody)body; - System.out.println("[Broker has deleted the queue, message count " + queueDeleteOkBody.getMessageCount() + "]\n"); - } - - @Override - public void brokerRespondedWithError(AMQException e) - { - } - - }; - - queue.delete(queueDeleteBody, cb); - //Blocking for response - while (!cb.isComplete()){} - - } - - public void publishAndSubscribe() throws Exception - { - AMQPMessage message = _classFactory.createMessageClass(_channel,new MessageHelper()); - MessageConsumeBody messageConsumeBody = MessageConsumeBody.createMethodBody(_major, _minor, - new AMQShortString("myClient"),// destination - false, //exclusive - null, //filter - false, //noAck, - false, //noLocal, - new AMQShortString("MyTestQueue"), //queue - 0 //ticket - ); - - AMQPCallBack cb = createCallBackWithMessage("Broker has accepted the consume"); - message.consume(messageConsumeBody, cb); - //Blocking for response - while (!cb.isComplete()){} - - // Sending 5 messages serially - for (int i=0; i<5; i++) - { - cb = createCallBackWithMessage("Broker has accepted msg " + i); - message.transfer(createMessages("Test" + i),cb); - while (!cb.isComplete()){} - } - - MessageCancelBody messageCancelBody = MessageCancelBody.createMethodBody(_major, _minor, new AMQShortString("myClient")); - - AMQPCallBack cb2 = createCallBackWithMessage("Broker has accepted the consume cancel"); - message.cancel(messageCancelBody, cb2); - - } - - private MessageTransferBody createMessages(String content) throws Exception - { - FieldTable headers = FieldTableFactory.newFieldTable(); - headers.setAsciiString(new AMQShortString("Test"), System.currentTimeMillis() + ""); - - MessageTransferBody messageTransferBody = - MessageTransferBody.createMethodBody(_major, _minor, - new AMQShortString("testApp"), //appId - headers, //applicationHeaders - new Content(Content.TypeEnum.INLINE_T,content.getBytes()), //body - new AMQShortString(""), //contentEncoding, - new AMQShortString("text/plain"), //contentType - new AMQShortString("testApp"), //correlationId - (short)1, //deliveryMode non persistant - new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// destination - new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// exchange - 0l, //expiration - false, //immediate - false, //mandatory - new AMQShortString(UUID.randomUUID().toString()), //messageId - (short)0, //priority - false, //redelivered - new AMQShortString("RH"), //replyTo - new AMQShortString("RH"), //routingKey, - "abc".getBytes(), //securityToken - 0, //ticket - System.currentTimeMillis(), //timestamp - new AMQShortString(""), //transactionId - 0l, //ttl, - new AMQShortString("Hello") //userId - ); - - return messageTransferBody; - - } - - public void publishAndGet() throws Exception - { - AMQPMessage message = _classFactory.createMessageClass(_channel,new MessageHelper()); - AMQPCallBack cb = createCallBackWithMessage("Broker has accepted msg 5"); - - MessageGetBody messageGetBody = - MessageGetBody.createMethodBody(_major, _minor, - new AMQShortString("myClient"), - false, //noAck - new AMQShortString("MyTestQueue"), //queue - 0 //ticket - ); - - //AMQPMessage message = _classFactory.createMessage(_channel,new MessageHelper()); - message.transfer(createMessages("Test"),cb); - while(!cb.isComplete()){} - - cb = createCallBackWithMessage("Broker has accepted get"); - message.get(messageGetBody, cb); - } - - // Creates a gneric call back and prints the given message - private AMQPCallBack createCallBackWithMessage(final String msg) - { - AMQPCallBack cb = new AMQPCallBack(){ - - @Override - public void brokerResponded(AMQMethodBody body) - { - System.out.println(msg); - } - - @Override - public void brokerRespondedWithError(AMQException e) - { - } - - }; - - return cb; - } - - public static void main(String[] args) - { - TestClient test = new TestClient(); - try - { - AMQPConnection con = test.openConnection(); - test.handleConnectionNegotiation(con); - test.handleChannelNegotiation(); - test.createExchange(); - test.createAndBindQueue(); - test.publishAndSubscribe(); - test.purgeQueue(); - test.publishAndGet(); - test.deleteQueue(); + private byte _major; + + private byte _minor; + + private ConnectionURL _url; + + private static int _channel = 2; + + // Need a Class factory per connection + private AMQPClassFactory _classFactory = new AMQPClassFactory(); + + private int _ticket; + + public AMQPConnection openConnection() throws Exception + { + //_url = new AMQPConnectionURL("amqp://guest:[EMAIL PROTECTED]/localhost?brokerlist='vm://:3'"); + + _url = new AMQPConnectionURL("amqp://guest:[EMAIL PROTECTED]/test?brokerlist='tcp://localhost:5672?'"); + return _classFactory.createConnectionClass(_url, ConnectionType.TCP); + } + + public void handleConnectionNegotiation(AMQPConnection con) throws Exception + { + StateHelper stateHelper = new StateHelper(); + _classFactory.getStateManager().addListener(AMQPStateType.CONNECTION_STATE, stateHelper); + _classFactory.getStateManager().addListener(AMQPStateType.CHANNEL_STATE, stateHelper); + + //ConnectionStartBody + ConnectionStartBody connectionStartBody = con.openTCPConnection(); + _major = connectionStartBody.getMajor(); + _minor = connectionStartBody.getMinor(); + + FieldTable clientProperties = FieldTableFactory.newFieldTable(); + clientProperties.put(new AMQShortString(ClientProperties.instance.toString()), "Test"); // setting only the client id + + final String locales = new String(connectionStartBody.getLocales(), "utf8"); + final StringTokenizer tokenizer = new StringTokenizer(locales, " "); + + final String mechanism = SecurityHelper.chooseMechanism(connectionStartBody.getMechanisms()); + + SaslClient sc = Sasl.createSaslClient(new String[] + { mechanism }, null, "AMQP", "localhost", null, SecurityHelper.createCallbackHandler(mechanism, _url)); + + ConnectionStartOkBody connectionStartOkBody = ConnectionStartOkBody.createMethodBody(_major, _minor, clientProperties, new AMQShortString( + tokenizer.nextToken()), new AMQShortString(mechanism), (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null)); + // ConnectionSecureBody + AMQMethodBody body = con.startOk(connectionStartOkBody); + ConnectionTuneBody connectionTuneBody; + + if (body instanceof ConnectionSecureBody) + { + ConnectionSecureBody connectionSecureBody = (ConnectionSecureBody) body; + ConnectionSecureOkBody connectionSecureOkBody = ConnectionSecureOkBody.createMethodBody(_major, _minor, sc + .evaluateChallenge(connectionSecureBody.getChallenge())); + //Assuming the server is not going to send another challenge + connectionTuneBody = (ConnectionTuneBody) con.secureOk(connectionSecureOkBody); + + } + else + { + connectionTuneBody = (ConnectionTuneBody) body; + } + + // Using broker supplied values + ConnectionTuneOkBody connectionTuneOkBody = ConnectionTuneOkBody.createMethodBody(_major, _minor, connectionTuneBody.getChannelMax(), + connectionTuneBody.getFrameMax(), connectionTuneBody.getHeartbeat()); + con.tuneOk(connectionTuneOkBody); + + ConnectionOpenBody connectionOpenBody = ConnectionOpenBody.createMethodBody(_major, _minor, null, true, new AMQShortString(_url + .getVirtualHost())); + + ConnectionOpenOkBody connectionOpenOkBody = con.open(connectionOpenBody); + } + + public void handleChannelNegotiation() throws Exception + { + AMQPChannel channel = _classFactory.createChannelClass(_channel); + + ChannelOpenBody channelOpenBody = ChannelOpenBody.createMethodBody(_major, _minor, new AMQShortString("myChannel1")); + ChannelOpenOkBody channelOpenOkBody = channel.open(channelOpenBody); + + //lets have some fun + ChannelFlowBody channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, false); + + ChannelFlowOkBody channelFlowOkBody = channel.flow(channelFlowBody); + System.out.println("Channel is " + (channelFlowOkBody.getActive() ? "active" : "suspend")); + + channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, true); + channelFlowOkBody = channel.flow(channelFlowBody); + System.out.println("Channel is " + (channelFlowOkBody.getActive() ? "active" : "suspend")); + } + + public void createExchange() throws Exception + { + AMQPExchange exchange = _classFactory.createExchangeClass(_channel); + + ExchangeDeclareBody exchangeDeclareBody = ExchangeDeclareBody.createMethodBody(_major, _minor, null, // arguments + false,//auto delete + false,// durable + new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME), true, //internal + false,// nowait + false,// passive + _ticket, new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)); + + AMQPCallBack cb = createCallBackWithMessage("Broker has created the exchange"); + exchange.declare(exchangeDeclareBody, cb); + // Blocking for response + while (!cb.isComplete()) + { + } + } + + public void createAndBindQueue() throws Exception + { + AMQPQueue queue = _classFactory.createQueueClass(_channel); + + QueueDeclareBody queueDeclareBody = QueueDeclareBody.createMethodBody(_major, _minor, null, //arguments + false,//auto delete + false,// durable + false, //exclusive, + false, //nowait, + false, //passive, + new AMQShortString("MyTestQueue"), 0); + + AMQPCallBack cb = new AMQPCallBack() + { + + @Override + public void brokerResponded(AMQMethodBody body) + { + QueueDeclareOkBody queueDeclareOkBody = (QueueDeclareOkBody) body; + System.out.println("[Broker has created the queue, " + "message count " + queueDeclareOkBody.getMessageCount() + "consumer count " + + queueDeclareOkBody.getConsumerCount() + "]\n"); + } + + @Override + public void brokerRespondedWithError(AMQException e) + { + } + + }; + + queue.declare(queueDeclareBody, cb); + //Blocking for response + while (!cb.isComplete()) + { + } + + QueueBindBody queueBindBody = QueueBindBody.createMethodBody(_major, _minor, null, //arguments + new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),//exchange + false, //nowait + new AMQShortString("MyTestQueue"), //queue + new AMQShortString("RH"), //routingKey + 0 //ticket + ); + + cb = createCallBackWithMessage("Broker has bound the queue"); + queue.bind(queueBindBody, cb); + //Blocking for response + while (!cb.isComplete()) + { + } + } + + public void purgeQueue() throws Exception + { + AMQPQueue queue = _classFactory.createQueueClass(_channel); + + QueuePurgeBody queuePurgeBody = QueuePurgeBody.createMethodBody(_major, _minor, false, //nowait + new AMQShortString("MyTestQueue"), //queue + 0 //ticket + ); + + AMQPCallBack cb = new AMQPCallBack() + { + + @Override + public void brokerResponded(AMQMethodBody body) + { + QueuePurgeOkBody queuePurgeOkBody = (QueuePurgeOkBody) body; + System.out.println("[Broker has purged the queue, message count " + queuePurgeOkBody.getMessageCount() + "]\n"); + } + + @Override + public void brokerRespondedWithError(AMQException e) + { + } + + }; + + queue.purge(queuePurgeBody, cb); + //Blocking for response + while (!cb.isComplete()) + { + } + + } + + public void deleteQueue() throws Exception + { + AMQPQueue queue = _classFactory.createQueueClass(_channel); + + QueueDeleteBody queueDeleteBody = QueueDeleteBody.createMethodBody(_major, _minor, false, //ifEmpty + false, //ifUnused + false, //nowait + new AMQShortString("MyTestQueue"), //queue + 0 //ticket + ); + + AMQPCallBack cb = new AMQPCallBack() + { + + @Override + public void brokerResponded(AMQMethodBody body) + { + QueueDeleteOkBody queueDeleteOkBody = (QueueDeleteOkBody) body; + System.out.println("[Broker has deleted the queue, message count " + queueDeleteOkBody.getMessageCount() + "]\n"); + } + + @Override + public void brokerRespondedWithError(AMQException e) + { + } + + }; + + queue.delete(queueDeleteBody, cb); + //Blocking for response + while (!cb.isComplete()) + { + } + + } + + public void publishAndSubscribe() throws Exception + { + AMQPMessage message = _classFactory.createMessageClass(_channel, new MessageHelper()); + MessageConsumeBody messageConsumeBody = MessageConsumeBody.createMethodBody(_major, _minor, new AMQShortString("myClient"),// destination + false, //exclusive + null, //filter + false, //noAck, + false, //noLocal, + new AMQShortString("MyTestQueue"), //queue + 0 //ticket + ); + + AMQPCallBack cb = createCallBackWithMessage("Broker has accepted the consume"); + message.consume(messageConsumeBody, cb); + //Blocking for response + while (!cb.isComplete()) + { + } + + // Sending 5 messages serially + for (int i = 0; i < 5; i++) + { + cb = createCallBackWithMessage("Broker has accepted msg " + i); + message.transfer(createMessages("Test" + i), cb); + while (!cb.isComplete()) + { + } + } + + MessageCancelBody messageCancelBody = MessageCancelBody.createMethodBody(_major, _minor, new AMQShortString("myClient")); + + AMQPCallBack cb2 = createCallBackWithMessage("Broker has accepted the consume cancel"); + message.cancel(messageCancelBody, cb2); + } - catch (Exception e) + + private MessageTransferBody createMessages(String content) throws Exception + { + FieldTable headers = FieldTableFactory.newFieldTable(); + headers.setAsciiString(new AMQShortString("Test"), System.currentTimeMillis() + ""); + + MessageTransferBody messageTransferBody = MessageTransferBody.createMethodBody(_major, _minor, new AMQShortString("testApp"), //appId + headers, //applicationHeaders + new Content(Content.TypeEnum.INLINE_T, content.getBytes()), //body + new AMQShortString(""), //contentEncoding, + new AMQShortString("text/plain"), //contentType + new AMQShortString("testApp"), //correlationId + (short) 1, //deliveryMode non persistant + new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// destination + new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// exchange + 0l, //expiration + false, //immediate + false, //mandatory + new AMQShortString(UUID.randomUUID().toString()), //messageId + (short) 0, //priority + false, //redelivered + new AMQShortString("RH"), //replyTo + new AMQShortString("RH"), //routingKey, + "abc".getBytes(), //securityToken + 0, //ticket + System.currentTimeMillis(), //timestamp + new AMQShortString(""), //transactionId + 0l, //ttl, + new AMQShortString("Hello") //userId + ); + + return messageTransferBody; + + } + + public void publishAndGet() throws Exception + { + AMQPMessage message = _classFactory.createMessageClass(_channel, new MessageHelper()); + AMQPCallBack cb = createCallBackWithMessage("Broker has accepted msg 5"); + + MessageGetBody messageGetBody = MessageGetBody.createMethodBody(_major, _minor, new AMQShortString("myClient"), false, //noAck + new AMQShortString("MyTestQueue"), //queue + 0 //ticket + ); + + //AMQPMessage message = _classFactory.createMessage(_channel,new MessageHelper()); + message.transfer(createMessages("Test"), cb); + while (!cb.isComplete()) + { + } + + cb = createCallBackWithMessage("Broker has accepted get"); + message.get(messageGetBody, cb); + } + + // Creates a gneric call back and prints the given message + private AMQPCallBack createCallBackWithMessage(final String msg) + { + AMQPCallBack cb = new AMQPCallBack() + { + + @Override + public void brokerResponded(AMQMethodBody body) + { + System.out.println(msg); + } + + @Override + public void brokerRespondedWithError(AMQException e) + { + } + + }; + + return cb; + } + + public static void main(String[] args) { - e.printStackTrace(); + TestClient test = new TestClient(); + try + { + AMQPConnection con = test.openConnection(); + test.handleConnectionNegotiation(con); + test.handleChannelNegotiation(); + test.createExchange(); + test.createAndBindQueue(); + test.publishAndSubscribe(); + test.purgeQueue(); + test.publishAndGet(); + test.deleteQueue(); + } + catch (Exception e) + { + e.printStackTrace(); + } } - } }
Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java?view=diff&rev=524144&r1=524143&r2=524144 ============================================================================== --- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java (original) +++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java Fri Mar 30 08:53:18 2007 @@ -38,7 +38,8 @@ public String toString() { - return "AMQState: id = " + _id + " name: " + _name; + //return "AMQState: id = " + _id + " name: " + _name; + return _name; // looks better with loggin } // Connection state Added: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java?view=auto&rev=524144 ============================================================================== --- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java (added) +++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java Fri Mar 30 08:53:18 2007 @@ -0,0 +1,68 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp.state; + +public class AMQPStateChangedEvent +{ + private AMQPState _oldState; + + private AMQPState _newState; + + private AMQPStateType _stateType; + + public AMQPStateChangedEvent(AMQPState oldState, AMQPState newState, AMQPStateType stateType) + { + _oldState = oldState; + _newState = newState; + _stateType = stateType; + } + + public AMQPState getNewState() + { + return _newState; + } + + public void setNewState(AMQPState newState) + { + this._newState = newState; + } + + public AMQPState getOldState() + { + return _oldState; + } + + public void setOldState(AMQPState oldState) + { + this._oldState = oldState; + } + + public AMQPStateType getStateType() + { + return _stateType; + } + + public void setStateType(AMQPStateType stateType) + { + this._stateType = stateType; + } + +} Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java?view=diff&rev=524144&r1=524143&r2=524144 ============================================================================== --- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java (original) +++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java Fri Mar 30 08:53:18 2007 @@ -4,5 +4,5 @@ public interface AMQPStateListener { - public void stateChanged(AMQPState oldState, AMQPState newState) throws AMQPException; + public void stateChanged(AMQPStateChangedEvent event) throws AMQPException; } Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java?view=diff&rev=524144&r1=524143&r2=524144 ============================================================================== --- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java (original) +++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java Fri Mar 30 08:53:18 2007 @@ -1,11 +1,13 @@ package org.apache.qpid.nclient.amqp.state; import org.apache.qpid.AMQException; +import org.apache.qpid.nclient.core.AMQPException; public interface AMQPStateManager { - - public void addListener(AMQPStateListener l)throws AMQException; + public void addListener(AMQPStateType stateType, AMQPStateListener l)throws AMQException; + + public void removeListener(AMQPStateType stateType, AMQPStateListener l)throws AMQException; - public void removeListener(AMQPStateListener l)throws AMQException; + public void notifyStateChanged(AMQPStateChangedEvent event) throws AMQPException; } Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java?view=diff&rev=524144&r1=524143&r2=524144 ============================================================================== --- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java (original) +++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java Fri Mar 30 08:53:18 2007 @@ -40,7 +40,7 @@ public String toString() { - return "AMQState: id = " + _typeId + " name: " + _typeName; + return _typeName; } // Connection state Modified: incubator/qpid/branches/client_restructure/java/pom.xml URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/pom.xml?view=diff&rev=524144&r1=524143&r2=524144 ============================================================================== --- incubator/qpid/branches/client_restructure/java/pom.xml (original) +++ incubator/qpid/branches/client_restructure/java/pom.xml Fri Mar 30 08:53:18 2007 @@ -130,6 +130,7 @@ <module>common</module> <module>broker</module> <module>client</module> + <module>newclient</module> <module>cluster</module> <module>systests</module> <module>perftests</module> @@ -382,7 +383,7 @@ <dependency> <groupId>commons-configuration</groupId> <artifactId>commons-configuration</artifactId> - <version>1.2</version> + <version>1.3</version> </dependency> <dependency> <groupId>commons-lang</groupId> @@ -471,6 +472,11 @@ <dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-client</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-newclient</artifactId> <version>${project.version}</version> </dependency> <dependency>
