Author: rajith
Date: Mon Dec 11 08:16:55 2006
New Revision: 485735
URL: http://svn.apache.org/viewvc?view=rev&rev=485735
Log:
This contains a fix for QPID-165 and QPID-166
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=485735&r1=485734&r2=485735
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
Mon Dec 11 08:16:55 2006
@@ -54,6 +54,7 @@
import java.nio.channels.UnresolvedAddressException;
import java.text.MessageFormat;
import java.util.ArrayList;
+import java.util.Enumeration;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
@@ -550,8 +551,8 @@
public ConnectionMetaData getMetaData() throws JMSException
{
checkNotClosed();
- // TODO Auto-generated method stub
- return null;
+ return QpidConnectionMetaData.instance();
+
}
public ExceptionListener getExceptionListener() throws JMSException
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=485735&r1=485734&r2=485735
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Mon Dec 11 08:16:55 2006
@@ -38,6 +38,7 @@
import javax.jms.*;
import javax.jms.IllegalStateException;
+
import java.io.Serializable;
import java.text.MessageFormat;
import java.util.ArrayList;
@@ -279,7 +280,7 @@
this(con, channelId, transacted, acknowledgeMode,
MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh,
defaultPrefetchLow);
}
- AMQConnection getAMQConnection()
+ public AMQConnection getAMQConnection()
{
return _connection;
}
@@ -744,6 +745,7 @@
*/
public QueueReceiver createQueueReceiver(Destination destination) throws
JMSException
{
+ checkValidDestination(destination);
AMQQueue dest = (AMQQueue) destination;
BasicMessageConsumer consumer = (BasicMessageConsumer)
createConsumer(destination);
return new QueueReceiverAdaptor(dest, consumer);
@@ -759,6 +761,7 @@
*/
public QueueReceiver createQueueReceiver(Destination destination, String
messageSelector) throws JMSException
{
+ checkValidDestination(destination);
AMQQueue dest = (AMQQueue) destination;
BasicMessageConsumer consumer = (BasicMessageConsumer)
createConsumer(destination, messageSelector);
@@ -767,17 +770,20 @@
public MessageConsumer createConsumer(Destination destination) throws
JMSException
{
+ checkValidDestination(destination);
return createConsumer(destination, _defaultPrefetchHighMark,
_defaultPrefetchLowMark, false, false, null);
}
public MessageConsumer createConsumer(Destination destination, String
messageSelector) throws JMSException
{
+ checkValidDestination(destination);
return createConsumer(destination, _defaultPrefetchHighMark,
_defaultPrefetchLowMark, false, false, messageSelector);
}
public MessageConsumer createConsumer(Destination destination, String
messageSelector, boolean noLocal)
throws JMSException
{
+ checkValidDestination(destination);
return createConsumer(destination, _defaultPrefetchHighMark,
_defaultPrefetchLowMark, noLocal, false, messageSelector);
}
@@ -787,6 +793,7 @@
boolean exclusive,
String selector) throws JMSException
{
+ checkValidDestination(destination);
return createConsumer(destination, prefetch, prefetch, noLocal,
exclusive, selector, null);
}
@@ -798,6 +805,7 @@
boolean exclusive,
String selector) throws JMSException
{
+ checkValidDestination(destination);
return createConsumer(destination, prefetchHigh, prefetchLow, noLocal,
exclusive, selector, null);
}
@@ -808,6 +816,7 @@
String selector,
FieldTable rawSelector) throws
JMSException
{
+ checkValidDestination(destination);
return createConsumerImpl(destination, prefetch, prefetch, noLocal,
exclusive,
selector, rawSelector);
}
@@ -820,6 +829,7 @@
String selector,
FieldTable rawSelector) throws
JMSException
{
+ checkValidDestination(destination);
return createConsumerImpl(destination, prefetchHigh, prefetchLow,
noLocal, exclusive,
selector, rawSelector);
}
@@ -1045,6 +1055,7 @@
public TopicSubscriber createSubscriber(Topic topic) throws JMSException
{
checkNotClosed();
+ checkValidTopic(topic);
AMQTopic dest = new AMQTopic(topic.getTopicName());
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer)
createConsumer(dest));
}
@@ -1061,6 +1072,7 @@
public TopicSubscriber createSubscriber(Topic topic, String
messageSelector, boolean noLocal) throws JMSException
{
checkNotClosed();
+ checkValidTopic(topic);
AMQTopic dest = new AMQTopic(topic.getTopicName());
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer)
createConsumer(dest, messageSelector, noLocal));
}
@@ -1075,6 +1087,7 @@
public TopicSubscriber createDurableSubscriber(Topic topic, String name)
throws JMSException
{
checkNotClosed();
+ checkValidTopic(topic);
AMQTopic dest = new AMQTopic((AMQTopic) topic,
_connection.getClientID(), name);
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer)
createConsumer(dest));
}
@@ -1086,6 +1099,7 @@
throws JMSException
{
checkNotClosed();
+ checkValidTopic(topic);
AMQTopic dest = new AMQTopic((AMQTopic) topic,
_connection.getClientID(), name);
BasicMessageConsumer consumer = (BasicMessageConsumer)
createConsumer(dest, messageSelector, noLocal);
return new TopicSubscriberAdaptor(dest, consumer);
@@ -1094,6 +1108,7 @@
public TopicPublisher createPublisher(Topic topic) throws JMSException
{
checkNotClosed();
+ checkValidTopic(topic);
//return (TopicPublisher) createProducer(topic);
return new TopicPublisherAdapter(createProducer(topic), topic);
}
@@ -1101,12 +1116,14 @@
public QueueBrowser createBrowser(Queue queue) throws JMSException
{
checkNotClosed();
+ checkValidQueue(queue);
throw new UnsupportedOperationException("Queue browsing not
supported");
}
public QueueBrowser createBrowser(Queue queue, String messageSelector)
throws JMSException
{
checkNotClosed();
+ checkValidQueue(queue);
throw new UnsupportedOperationException("Queue browsing not
supported");
}
@@ -1124,6 +1141,8 @@
public void unsubscribe(String name) throws JMSException
{
+ checkNotClosed();
+
//send a queue.delete for the subscription
String queue = _connection.getClientID() + ":" + name;
AMQFrame frame = QueueDeleteBody.createAMQFrame(_channelId, 0, queue,
false, false, true);
@@ -1324,5 +1343,26 @@
_logger.warn("Unsuspending channel");
AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
true);
_connection.getProtocolHandler().writeFrame(channelFlowFrame);
+ }
+
+ /*
+ * I could have combined the last 3 methods, but this way it improves
readability
+ */
+ private void checkValidTopic(Topic topic) throws
InvalidDestinationException{
+ if (topic == null){
+ throw new javax.jms.InvalidDestinationException("Invalid
Topic");
+ }
+ }
+
+ private void checkValidQueue(Queue queue) throws
InvalidDestinationException{
+ if (queue == null){
+ throw new javax.jms.InvalidDestinationException("Invalid
Queue");
+ }
+ }
+
+ private void checkValidDestination(Destination destination) throws
InvalidDestinationException{
+ if (destination == null){
+ throw new javax.jms.InvalidDestinationException("Invalid
Queue");
+ }
}
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=485735&r1=485734&r2=485735
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Mon Dec 11 08:16:55 2006
@@ -544,7 +544,7 @@
this.checkNotClosed();
if(_session == null || _session.isClosed()){
- throw new UnsupportedOperationException("Invalid
Session");
+ throw new javax.jms.IllegalStateException("Invalid
Session");
}
}
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=485735&r1=485734&r2=485735
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
Mon Dec 11 08:16:55 2006
@@ -30,6 +30,7 @@
import javax.jms.DeliveryMode;
import javax.jms.Destination;
+import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.Message;
import java.io.UnsupportedEncodingException;
@@ -231,6 +232,7 @@
public void send(Message message) throws JMSException
{
checkPreConditions();
+ checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, (AbstractJMSMessage) message,
_deliveryMode, _messagePriority, _timeToLive,
@@ -241,6 +243,7 @@
public void send(Message message, int deliveryMode) throws JMSException
{
checkPreConditions();
+ checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode,
_messagePriority, _timeToLive,
@@ -251,6 +254,7 @@
public void send(Message message, int deliveryMode, boolean immediate)
throws JMSException
{
checkPreConditions();
+ checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode,
_messagePriority, _timeToLive,
@@ -262,6 +266,7 @@
long timeToLive) throws JMSException
{
checkPreConditions();
+ checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, (AbstractJMSMessage)message, deliveryMode,
priority, timeToLive, _mandatory,
@@ -272,6 +277,7 @@
public void send(Destination destination, Message message) throws
JMSException
{
checkPreConditions();
+ checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
@@ -285,6 +291,7 @@
throws JMSException
{
checkPreConditions();
+ checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
@@ -298,6 +305,7 @@
throws JMSException
{
checkPreConditions();
+ checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
@@ -311,6 +319,7 @@
throws JMSException
{
checkPreConditions();
+ checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
@@ -325,6 +334,7 @@
throws JMSException
{
checkPreConditions();
+ checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
@@ -487,17 +497,30 @@
_encoding = encoding;
}
- private void checkPreConditions() throws IllegalStateException,
JMSException {
+ private void checkPreConditions() throws
javax.jms.IllegalStateException, JMSException {
checkNotClosed();
-
+
+ if(_session == null || _session.isClosed()){
+ throw new javax.jms.IllegalStateException("Invalid
Session");
+ }
+ }
+
+ private void checkInitialDestination(){
if(_destination == null){
throw new UnsupportedOperationException("Destination is
null");
}
+ }
+
+ private void checkDestination(Destination suppliedDestination) throws
InvalidDestinationException{
+ if (_destination != null && suppliedDestination != null){
+ throw new UnsupportedOperationException("This message
producer was created with a Destination, therefore you cannot use an
unidentified Destination");
+ }
- if(_session == null || _session.isClosed()){
- throw new UnsupportedOperationException("Invalid
Session");
+ if (suppliedDestination == null){
+ throw new InvalidDestinationException("Supplied
Destination was invalid");
}
}
+
public AMQSession getSession() {
return _session;
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java?view=auto&rev=485735
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
(added)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
Mon Dec 11 08:16:55 2006
@@ -0,0 +1,50 @@
+package org.apache.qpid.client;
+
+import java.util.Enumeration;
+
+import javax.jms.ConnectionMetaData;
+import javax.jms.JMSException;
+
+public class QpidConnectionMetaData implements ConnectionMetaData {
+
+ private static QpidConnectionMetaData _instance = new
QpidConnectionMetaData();
+
+ private QpidConnectionMetaData(){
+ }
+
+ public static QpidConnectionMetaData instance(){
+ return _instance;
+ }
+
+ public int getJMSMajorVersion() throws JMSException {
+ return 1;
+ }
+
+ public int getJMSMinorVersion() throws JMSException {
+ return 1;
+ }
+
+ public String getJMSProviderName() throws JMSException {
+ return "Apache Qpid";
+ }
+
+ public String getJMSVersion() throws JMSException {
+ return "1.1";
+ }
+
+ public Enumeration getJMSXPropertyNames() throws JMSException {
+ return null;
+ }
+
+ public int getProviderMajorVersion() throws JMSException {
+ return 0;
+ }
+
+ public int getProviderMinorVersion() throws JMSException {
+ return 9;
+ }
+
+ public String getProviderVersion() throws JMSException {
+ return "Incubating-M1";
+ }
+}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java?view=diff&rev=485735&r1=485734&r2=485735
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java
Mon Dec 11 08:16:55 2006
@@ -103,7 +103,7 @@
AMQSession session = msgConsumer.getSession();
if(session == null || session.isClosed()){
- throw new UnsupportedOperationException("Invalid
Session");
+ throw new javax.jms.IllegalStateException("Invalid
Session");
}
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java?view=diff&rev=485735&r1=485734&r2=485735
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
Mon Dec 11 08:16:55 2006
@@ -52,26 +52,32 @@
}
public int getDeliveryMode() throws JMSException {
+ checkPreConditions();
return delegate.getDeliveryMode();
}
public Destination getDestination() throws JMSException {
+ checkPreConditions();
return delegate.getDestination();
}
public boolean getDisableMessageID() throws JMSException {
+ checkPreConditions();
return delegate.getDisableMessageID();
}
public boolean getDisableMessageTimestamp() throws JMSException {
+ checkPreConditions();
return delegate.getDisableMessageTimestamp();
}
public int getPriority() throws JMSException {
+ checkPreConditions();
return delegate.getPriority();
}
public long getTimeToLive() throws JMSException {
+ checkPreConditions();
return delegate.getTimeToLive();
}
@@ -128,7 +134,7 @@
AMQSession session =
((BasicMessageProducer)delegate).getSession();
if(session == null || session.isClosed()){
- throw new UnsupportedOperationException("Invalid
Session");
+ throw new javax.jms.IllegalStateException("Invalid
Session");
}
}
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java?view=diff&rev=485735&r1=485734&r2=485735
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java
Mon Dec 11 08:16:55 2006
@@ -132,7 +132,7 @@
AMQSession session =
((BasicMessageProducer)delegate).getSession();
if(session == null || session.isClosed()){
- throw new UnsupportedOperationException("Invalid
Session");
+ throw new javax.jms.IllegalStateException("Invalid
Session");
}
}
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java?view=diff&rev=485735&r1=485734&r2=485735
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
Mon Dec 11 08:16:55 2006
@@ -116,7 +116,7 @@
AMQSession session = msgConsumer.getSession();
if(session == null || session.isClosed()){
- throw new UnsupportedOperationException("Invalid
Session");
+ throw new javax.jms.IllegalStateException("Invalid
Session");
}
}
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=485735&r1=485734&r2=485735
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
Mon Dec 11 08:16:55 2006
@@ -384,11 +384,15 @@
}
public void acknowledge() throws JMSException
- {
+ {
// the JMS 1.1 spec says in section 3.6 that calls to acknowledge are
ignored when client acknowledge
// is not specified. In our case, we only set the session field where
client acknowledge mode is specified.
if (_session != null)
{
+ if (_session.getAMQConnection().isClosed()){
+ throw new javax.jms.IllegalStateException("Connection
is already closed");
+ }
+
// we set multiple to true here since acknowledgement implies
acknowledge of all previous messages
// received on the session
_session.acknowledgeMessage(_deliveryTag, true);