Author: arnaudsimon
Date: Tue Jul 31 06:19:35 2007
New Revision: 561322
URL: http://svn.apache.org/viewvc?view=rev&rev=561322
Log:
improved implementation
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java?view=diff&rev=561322&r1=561321&r2=561322
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java
Tue Jul 31 06:19:35 2007
@@ -134,7 +134,6 @@
public String getClientID() throws JMSException
{
checkNotClosed();
-
return _clientID;
}
@@ -225,8 +224,7 @@
// start all the sessions
for (SessionImpl session : _sessions)
{
- //TODO session.start();
- //TODO Exception handling
+ session.start();
}
_started = true;
}
@@ -247,11 +245,10 @@
checkNotClosed();
if (_started)
{
- // start all the sessions
+ // stop all the sessions
for (SessionImpl session : _sessions)
{
- //TODO session.stop();
- //TODO Exception handling
+ session.stop();
}
_started = false;
}
@@ -282,8 +279,7 @@
// close all the sessions
for (SessionImpl session : _sessions)
{
- //TODO session.close();
- //TODO Exception handling
+ session.close();
}
// close the underlaying Qpid connection
try
@@ -337,13 +333,24 @@
//-------------- QueueConnection API
- public QueueSession createQueueSession(boolean b, int i) throws
JMSException
+ /**
+ * Create a QueueSession.
+ *
+ * @param transacted Indicates whether the session is transacted.
+ * @param acknowledgeMode Indicates whether the consumer or the
+ * client will acknowledge any messages it
receives; ignored if the session
+ * is transacted. Legal values are
<code>Session.AUTO_ACKNOWLEDGE</code>,
+ * <code>Session.CLIENT_ACKNOWLEDGE</code> and
<code>Session.DUPS_OK_ACKNOWLEDGE</code>.
+ * @return A queueSession object/
+ * @throws JMSException If creating a QueueSession fails due to some
internal error.
+ */
+ public QueueSession createQueueSession(boolean transacted, int
acknowledgeMode) throws JMSException
{
checkNotClosed();
- //TODO: create a queue session
- QueueSessionImpl queueSession = null;
+ QueueSessionImpl queueSession = new QueueSessionImpl(this, transacted,
acknowledgeMode);
+ // add this session to the list of handled sessions.
_sessions.add(queueSession);
- return null; //To change body of implemented methods use File |
Settings | File Templates.
+ return queueSession;
}
/**
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java?view=diff&rev=561322&r1=561321&r2=561322
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java
Tue Jul 31 06:19:35 2007
@@ -18,6 +18,7 @@
package org.apache.qpid.nclient.jms;
import javax.jms.Destination;
+import javax.jms.JMSException;
/**
* Implementation of the JMS Destination interface
@@ -29,13 +30,24 @@
*/
protected String _name = null;
+ /**
+ * The session used to create this destination
+ */
+ protected SessionImpl _session;
+
//--- Constructor
/**
* Create a new DestinationImpl with a given name.
- * @param name The name of this destination
+ *
+ * @param name The name of this destination.
+ * @param session The session used to create this destination.
+ * @throws JMSException If the destiantion name is not valid
*/
- protected DestinationImpl(String name)
+ protected DestinationImpl(SessionImpl session, String name) throws
JMSException
{
+ // TODO validate that this destination name exists
+ //_session.getQpidSession()
+ _session = session;
_name = name;
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java?view=diff&rev=561322&r1=561321&r2=561322
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java
Tue Jul 31 06:19:35 2007
@@ -29,10 +29,14 @@
//--- Constructor
/**
* Create a new QueueImpl with a given name.
+ *
+ * @param name The name of this queue.
+ * @param session The session used to create this queue.
+ * @throws JMSException If the queue name is not valid
*/
- public QueueImpl(String name)
+ protected QueueImpl(SessionImpl session, String name) throws JMSException
{
- super(name);
+ super(session, name);
}
//---- Interface javax.jms.Queue
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java?view=diff&rev=561322&r1=561321&r2=561322
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
Tue Jul 31 06:19:35 2007
@@ -21,6 +21,7 @@
import org.slf4j.LoggerFactory;
import org.apache.qpid.nclient.jms.message.*;
import org.apache.qpidity.QpidException;
+import org.apache.qpidity.Option;
import javax.jms.*;
import javax.jms.IllegalStateException;
@@ -111,7 +112,7 @@
_acknowledgeMode = acknowledgeMode;
try
{
- // create the qpid session
+ // create the qpid session with an expiry <= 0 so that the
session does not expire
_qpidSession = _connection.getQpidConnection().createSession(0);
// set transacted if required
if (_transacted)
@@ -153,9 +154,8 @@
}
/**
- * Creates a <CODE>Message</CODE> object that holds all the
- * standard message header information. It can be sent when a message
- * containing only header information is sufficient.
+ * Creates a <code>Message</code> object that holds all the standard
message header information.
+ * It can be sent when a message containing only header information is
sufficient.
* We simply return a ByteMessage
*
* @return A Message.
@@ -167,7 +167,7 @@
}
/**
- * Creates an <CODE>ObjectMessage</CODE> used to send a message
+ * Creates an <code>ObjectMessage</code> used to send a message
* that contains a serializable Java object.
*
* @return An ObjectMessage.
@@ -180,7 +180,7 @@
}
/**
- * Creates an initialized <CODE>ObjectMessage</CODE> used to send a
message that contains
+ * Creates an initialized <code>ObjectMessage</code> used to send a
message that contains
* a serializable Java object.
*
* @param serializable The object to use to initialize this message.
@@ -195,7 +195,7 @@
}
/**
- * Creates a <CODE>StreamMessage</CODE> used to send a
+ * Creates a <code>StreamMessage</code> used to send a
* self-defining stream of primitive values in the Java programming
* language.
*
@@ -209,7 +209,7 @@
}
/**
- * Creates a <CODE>TextMessage</CODE> object used to send a message
containing a String.
+ * Creates a <code>TextMessage</code> object used to send a message
containing a String.
*
* @return A TextMessage object
* @throws JMSException If Creating an TextMessage object fails due to
some internal error.
@@ -221,7 +221,7 @@
}
/**
- * Creates an initialized <CODE>TextMessage</CODE> used to send
+ * Creates an initialized <code>TextMessage</code> used to send
* a message containing a String.
*
* @param text The string used to initialize this message.
@@ -320,13 +320,13 @@
/**
* Closes this session.
* <p> The JMS specification says
- * <P> This call will block until a <CODE>receive</CODE> call or message
+ * <P> This call will block until a <code>receive</code> call or message
* listener in progress has completed. A blocked message consumer
- * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session
is closed.
+ * <code>receive</code> call returns <code>null</code> when this session
is closed.
* <P>Closing a transacted session must roll back the transaction in
progress.
- * <P>This method is the only <CODE>Session</CODE> method that can be
called concurrently.
- * <P>Invoking any other <CODE>Session</CODE> method on a closed session
- * must throw a <CODE>javax.jms.IllegalStateException</CODE>.
+ * <P>This method is the only <code>Session</code> method that can be
called concurrently.
+ * <P>Invoking any other <code>Session</code> method on a closed session
+ * must throw a <code>javax.jms.IllegalStateException</code>.
* <p> Closing a closed session must <I>not</I> throw an exception.
*
* @throws JMSException If closing the session fails due to some internal
error.
@@ -443,7 +443,7 @@
/**
* Creates a MessageConsumer for the specified destination.
*
- * @param destination The <CODE>Destination</CODE> to access
+ * @param destination The <code>Destination</code> to access
* @return A new MessageConsumer for the specified destination.
* @throws JMSException If the session fails to create a
MessageConsumer due to some internal error.
* @throws InvalidDestinationException If an invalid destination is
specified.
@@ -456,7 +456,7 @@
/**
* Creates a MessageConsumer for the specified destination, using a
message selector.
*
- * @param destination The <CODE>Destination</CODE> to access
+ * @param destination The <code>Destination</code> to access
* @param messageSelector Only messages with properties matching the
message selector expression are delivered.
* @return A new MessageConsumer for the specified destination.
* @throws JMSException If the session fails to create a
MessageConsumer due to some internal error.
@@ -478,7 +478,7 @@
* NoLocal attribute allows a consumer to inhibit the delivery of messages
published by its
* own connection. The default value for this attribute is False.
*
- * @param destination The <CODE>Destination</CODE> to access
+ * @param destination The <code>Destination</code> to access
* @param messageSelector Only messages with properties matching the
message selector expression are delivered.
* @param noLocal If true, and the destination is a topic,
inhibits the delivery of messages published
* by its own connection.
@@ -505,17 +505,16 @@
* The physical creation of queues is an administrative task and is not
* to be initiated by the JMS API. The one exception is the
* creation of temporary queues, which is accomplished with the
- * <CODE>createTemporaryQueue</CODE> method.
+ * <code>createTemporaryQueue</code> method.
*
- * @param queueName the name of this <CODE>Queue</CODE>
- * @return a <CODE>Queue</CODE> with the given name
+ * @param queueName the name of this <code>Queue</code>
+ * @return a <code>Queue</code> with the given name
* @throws JMSException If the session fails to create a queue due to some
internal error.
*/
public Queue createQueue(String queueName) throws JMSException
{
checkNotClosed();
- // todo: check that this destiantion name does exist
- return new QueueImpl(queueName);
+ return new QueueImpl(this, queueName);
}
/**
@@ -528,23 +527,22 @@
* The physical creation of queues is an administrative task and is not
* to be initiated by the JMS API. The one exception is the
* creation of temporary queues, which is accomplished with the
- * <CODE>createTemporaryTopic</CODE> method.
+ * <code>createTemporaryTopic</code> method.
*
- * @param topicName The name of this <CODE>Topic</CODE>
- * @return a <CODE>Topic</CODE> with the given name
+ * @param topicName The name of this <code>Topic</code>
+ * @return a <code>Topic</code> with the given name
* @throws JMSException If the session fails to create a topic due to some
internal error.
*/
public Topic createTopic(String topicName) throws JMSException
{
checkNotClosed();
- // todo: check that this destiantion name does exist
- return new TopicImpl(topicName);
+ return new TopicImpl(this, topicName);
}
/**
* Creates a durable subscriber to the specified topic,
*
- * @param topic The non-temporary <CODE>Topic</CODE> to subscribe to.
+ * @param topic The non-temporary <code>Topic</code> to subscribe to.
* @param name The name used to identify this subscription.
* @return A durable subscriber to the specified topic,
* @throws JMSException If creating a subscriber fails due
to some internal error.
@@ -561,11 +559,11 @@
* Creates a durable subscriber to the specified topic, using a message
selector and specifying whether messages
* published by its
* own connection should be delivered to it.
- * <p> A client can change an existing durable subscription by creating a
durable <CODE>TopicSubscriber</CODE> with
+ * <p> A client can change an existing durable subscription by creating a
durable <code>TopicSubscriber</code> with
* the same name and a new topic and/or message selector. Changing a
durable subscriber is equivalent to
* unsubscribing (deleting) the old one and creating a new one.
*
- * @param topic The non-temporary <CODE>Topic</CODE> to
subscribe to.
+ * @param topic The non-temporary <code>Topic</code> to
subscribe to.
* @param name The name used to identify this subscription.
* @param messageSelector Only messages with properties matching the
message selector expression are delivered.
* @param noLocal If set, inhibits the delivery of messages
published by its own connection
@@ -585,7 +583,7 @@
/**
* Create a QueueBrowser to peek at the messages on the specified queue.
*
- * @param queue The <CODE>Queue</CODE> to browse.
+ * @param queue The <code>Queue</code> to browse.
* @return A QueueBrowser.
* @throws JMSException If creating a browser fails due to
some internal error.
* @throws InvalidDestinationException If an invalid queue is specified.
@@ -598,7 +596,7 @@
/**
* Create a QueueBrowser to peek at the messages on the specified queue
using a message selector.
*
- * @param queue The <CODE>Queue</CODE> to browse.
+ * @param queue The <code>Queue</code> to browse.
* @param messageSelector Only messages with properties matching the
message selector expression are delivered.
* @return A QueueBrowser.
* @throws JMSException If creating a browser fails due to
some internal error.
@@ -620,7 +618,7 @@
*/
public TemporaryQueue createTemporaryQueue() throws JMSException
{
- return new TemporaryQueueImpl();
+ return new TemporaryQueueImpl(this);
}
/**
@@ -631,7 +629,7 @@
*/
public TemporaryTopic createTemporaryTopic() throws JMSException
{
- return new TemporaryTopicImpl();
+ return new TemporaryTopicImpl(this);
}
/**
@@ -641,7 +639,7 @@
* subscriber by its provider.
* <p/>
* <P>It is erroneous for a client to delete a durable subscription
- * while there is an active <CODE>TopicSubscriber</CODE> for the
+ * while there is an active <code>TopicSubscriber</code> for the
* subscription, or while a consumed message is part of a pending
* transaction or has not been acknowledged in the session.
*
@@ -657,6 +655,43 @@
}
//----- Protected methods
+
+ /**
+ * Start the flow of message to this session.
+ *
+ * @throws JMSException If starting the session fails due to some
communication error.
+ */
+ protected void start() throws JMSException
+ {
+ try
+ {
+ // TODO: make sure that the correct options are used
+ _qpidSession.sessionFlow(Option.SUSPEND);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
+ /**
+ * Stop the flow of message to this session.
+ *
+ * @throws JMSException If stopping the session fails due to some
communication error.
+ */
+ protected void stop() throws JMSException
+ {
+ try
+ {
+ // TODO: make sure that the correct options are used
+ _qpidSession.sessionFlow(Option.RESUME);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
/**
* Notify this session that a message is processed
*
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java?view=diff&rev=561322&r1=561321&r2=561322
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java
Tue Jul 31 06:19:35 2007
@@ -32,19 +32,35 @@
//--- constructor
- public TemporaryQueueImpl()
+ /**
+ * Create a new TemporaryQueueImpl with a given name.
+ *
+ * @param session The session used to create this TemporaryQueueImpl.
+ * @throws JMSException If creating the TemporaryQueueImpl fails due to
some error.
+ */
+ public TemporaryQueueImpl(SessionImpl session) throws JMSException
{
// temporary destinations do not have names and are not registered in
the JNDI namespace.
- super("NAME_NOT_SET");
+ super(session, "NAME_NOT_SET");
}
//-- TemporaryDestination Interface
+ /**
+ * Specify whether this temporary destination is deleted.
+ *
+ * @return true is this temporary destination is deleted.
+ */
public boolean isdeleted()
{
return _isDeleted;
}
//-- TemporaryTopic Interface
+ /**
+ * Delete this temporary destinaiton
+ *
+ * @throws JMSException If deleting this temporary queue fails due to some
error.
+ */
public void delete() throws JMSException
{
// todo delete this temporary queue
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java?view=diff&rev=561322&r1=561321&r2=561322
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java
Tue Jul 31 06:19:35 2007
@@ -32,10 +32,16 @@
private boolean _isDeleted = false;
//--- constructor
- public TemporaryTopicImpl()
+ /**
+ * Create a new TemporaryTopicImpl with a given name.
+ *
+ * @param session The session used to create this TemporaryTopicImpl.
+ * @throws JMSException If creating the TemporaryTopicImpl fails due to
some error.
+ */
+ public TemporaryTopicImpl(SessionImpl session) throws JMSException
{
- // temporary destinations do not have names and are not registered in
the JNDI namespace.
- super("NAME_NOT_SET");
+ // temporary destinations do not have names.
+ super(session, "NAME_NOT_SET");
}
//-- TemporaryDestination Interface
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java?view=diff&rev=561322&r1=561321&r2=561322
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java
Tue Jul 31 06:19:35 2007
@@ -18,6 +18,7 @@
package org.apache.qpid.nclient.jms;
import javax.jms.Topic;
+import javax.jms.JMSException;
/**
* Implementation of the javax.jms.Topic interface.
@@ -27,10 +28,14 @@
//--- Constructor
/**
* Create a new TopicImpl with a given name.
+ *
+ * @param name The name of this topic
+ * @param session The session used to create this queue.
+ * @throws JMSException If the topic name is not valid
*/
- public TopicImpl(String name)
+ public TopicImpl(SessionImpl session, String name) throws JMSException
{
- super(name);
+ super(session, name);
}
//--- javax.jsm.Topic Interface