Author: rajith
Date: Fri Jun 1 13:40:34 2007
New Revision: 543602
URL: http://svn.apache.org/viewvc?view=rev&rev=543602
Log:
more enchancements for the Qpid java client. Also I have checked in a sample
client(QpidTestClient) on how to use the qpid java client.
Modified:
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AbstractResource.java
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidConnectionImpl.java
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidSessionImpl.java
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java
Modified:
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java?view=diff&rev=543602&r1=543601&r2=543602
==============================================================================
---
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java
(original)
+++
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java
Fri Jun 1 13:40:34 2007
@@ -1,12 +1,16 @@
package org.apache.qpid.nclient.amqp.sample;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.nclient.api.QpidConnection;
import org.apache.qpid.nclient.api.QpidConstants;
import org.apache.qpid.nclient.api.QpidExchangeHelper;
+import org.apache.qpid.nclient.api.QpidMessageConsumer;
import org.apache.qpid.nclient.api.QpidMessageProducer;
import org.apache.qpid.nclient.api.QpidQueueHelper;
import org.apache.qpid.nclient.api.QpidSession;
import org.apache.qpid.nclient.impl.QpidConnectionImpl;
+import org.apache.qpid.nclient.message.AMQPApplicationMessage;
+import org.apache.qpid.nclient.message.MessageHeaders;
public class QpidTestClient
{
@@ -21,20 +25,32 @@
session.open();
QpidExchangeHelper exchangeHelper =
session.getExchangeHelper();
- exchangeHelper.open();
exchangeHelper.declareExchange(false, false,
QpidConstants.DIRECT_EXCHANGE_NAME, false, false, false,
QpidConstants.DIRECT_EXCHANGE_CLASS);
QpidQueueHelper queueHelper = session.getQueueHelper();
- queueHelper.open();
queueHelper.declareQueue(false, false, false, false,
false, "myQueue");
queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "myQueue",
"RH");
+ MessageHeaders msgHeaders = new MessageHeaders();
+ msgHeaders.setRoutingKey(new AMQShortString("RH"));
+ msgHeaders.setExchange(new
AMQShortString(QpidConstants.DIRECT_EXCHANGE_NAME));
+ AMQPApplicationMessage msg = new
AMQPApplicationMessage(msgHeaders,"test".getBytes());
+
QpidMessageProducer messageProducer =
session.createProducer();
+ messageProducer.open();
+ messageProducer.send(false, true, msg);
+
+ QpidMessageConsumer messageConsumer =
session.createConsumer("myQueue", false, false);
+ messageConsumer.open();
+
+ AMQPApplicationMessage msg2 = messageConsumer.receive();
+ System.out.println(msg.toString());
}
catch(Exception e)
{
-
+ e.printStackTrace();
}
}
+
}
Modified:
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AbstractResource.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AbstractResource.java?view=diff&rev=543602&r1=543601&r2=543602
==============================================================================
---
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AbstractResource.java
(original)
+++
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AbstractResource.java
Fri Jun 1 13:40:34 2007
@@ -7,12 +7,18 @@
* This abstracts the error handling for open
* and close methods for a resource. This class
* eliminates the duplication of error handling
- * code
+ * code.
+ *
+ * This is not thread safe and is only to be used
+ * by a single thread at a time. Session and Connection
+ * have overriden key methods to provide thread safety.
+ *
*/
public abstract class AbstractResource
{
private String _resourceName;
private boolean _closed = true;
+ private boolean _opened = false;
public AbstractResource(String resourceName)
{
@@ -21,31 +27,36 @@
public void open() throws QpidException
{
- _closed = false;
- try
- {
- openResource();
-
- }
- catch(Exception e)
- {
- throw new QpidException("Error creating " +
_resourceName + " due to " + e.getMessage(),e);
+ if(!_opened)
+ {
+ try
+ {
+ openResource();
+ _opened = true;
+ _closed = false;
+ }
+ catch(Exception e)
+ {
+ throw new QpidException("Error creating " +
_resourceName + " due to " + e.getMessage(),e);
+ }
}
}
public void close() throws QpidException
{
- _closed = true;
- try
- {
- closeResource();
-
- }
- catch(Exception e)
- {
- throw new QpidException("Error destroying " +
_resourceName + " due to " + e.getMessage(),e);
+ if(!_closed)
+ {
+ try
+ {
+ closeResource();
+ _closed = true;
+ _opened = false;
+ }
+ catch(Exception e)
+ {
+ throw new QpidException("Error destroying " +
_resourceName + " due to " + e.getMessage(),e);
+ }
}
-
}
protected abstract void openResource() throws AMQPException,
QpidException;
Modified:
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidConnectionImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidConnectionImpl.java?view=diff&rev=543602&r1=543601&r2=543602
==============================================================================
---
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidConnectionImpl.java
(original)
+++
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidConnectionImpl.java
Fri Jun 1 13:40:34 2007
@@ -57,6 +57,7 @@
import org.apache.qpid.nclient.api.QpidException;
import org.apache.qpid.nclient.api.QpidSession;
import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.transport.AMQPConnectionURL;
import org.apache.qpid.nclient.transport.ConnectionURL;
import
org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType;
@@ -89,9 +90,9 @@
private Lock _lock = new ReentrantLock();
- private AtomicBoolean _closed;
+ private AtomicBoolean _closed = new AtomicBoolean(true);
- private AtomicBoolean _opened;
+ private AtomicBoolean _opened = new AtomicBoolean(false);
public QpidConnectionImpl()
{
@@ -164,8 +165,8 @@
try
{
- //_url = new AMQPConnectionURL("amqp://guest:[EMAIL
PROTECTED]/test?brokerlist='tcp://localhost:5672?'");
- _amqpConnection =
_classFactory.createConnectionClass(url, ConnectionType.TCP);
+ _url = new AMQPConnectionURL(url);
+ _amqpConnection =
_classFactory.createConnectionClass(_url, ConnectionType.TCP);
}
catch(Exception e)
{
@@ -181,6 +182,7 @@
throw new QpidException("Connection negotiation failed
due to " + e.getMessage(),e);
}
+ _closed.set(false);
_opened.set(true);
}
Modified:
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidSessionImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidSessionImpl.java?view=diff&rev=543602&r1=543601&r2=543602
==============================================================================
---
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidSessionImpl.java
(original)
+++
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidSessionImpl.java
Fri Jun 1 13:40:34 2007
@@ -66,8 +66,8 @@
private QpidQueueHelperImpl _qpidQueueHelper;
private QpidMessageHelperImpl _qpidMessageHelper;
private List<QpidMessageProducerImpl> _producers = new
ArrayList<QpidMessageProducerImpl>();
- private AtomicBoolean _closed;
- private AtomicInteger _consumerTag;
+ private AtomicBoolean _closed = new AtomicBoolean(true);
+ private AtomicInteger _consumerTag = new AtomicInteger();
private Lock _sessionCloseLock = new ReentrantLock();
// this will be used as soon as Session class is finalized
@@ -92,6 +92,7 @@
*/
protected void openResource() throws AMQPException, QpidException
{
+ _closed.set(false);
// These methods will be changed to session methods
openChannel();
Modified:
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java?view=diff&rev=543602&r1=543601&r2=543602
==============================================================================
---
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java
(original)
+++
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java
Fri Jun 1 13:40:34 2007
@@ -35,6 +35,12 @@
private boolean redeliveredFlag;
private MessageHeaders messageHeaders;
+ public AMQPApplicationMessage(MessageHeaders messageHeaders,byte[]
content)
+ {
+ this.messageHeaders = messageHeaders;
+ addContent(content);
+ }
+
public AMQPApplicationMessage(int channelId, byte[] referenceId)
{
this.channelId = channelId;