Author: rajith
Date: Mon Sep 24 18:55:25 2007
New Revision: 579033
URL: http://svn.apache.org/viewvc?rev=579033&view=rev
Log:
Changed the ExceptionListener to ClosedListerner that notifies a close event.
Rafi and I decided against throwing an exception as the close (connection or
session) can be called by a peer even when there is no error.
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java
Removed:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ExceptionListener.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=579033&r1=579032&r2=579033&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Mon Sep 24 18:55:25 2007
@@ -5,9 +5,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,6 +26,7 @@
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
+import org.apache.qpidity.ErrorCode;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.transport.RangeSet;
import org.apache.qpidity.transport.Option;
@@ -91,7 +92,7 @@
// create the qpid session with an expiry <= 0 so that the session
does not expire
_qpidSession = qpidConnection.createSession(0);
// set the exception listnere for this session
- _qpidSession.setExceptionListener(new QpidSessionExceptionListener());
+ _qpidSession.setClosedListener(new QpidSessionExceptionListener());
// set transacted if required
if (_transacted)
{
@@ -247,7 +248,7 @@
RangeSet ranges = new RangeSet();
for (long messageTag : _unacknowledgedMessageTags)
{
- // release this message
+ // release this message
ranges.add(messageTag);
}
getQpidSession().messageRelease(ranges);
@@ -442,15 +443,15 @@
/**
* Lstener for qpid protocol exceptions
*/
- private class QpidSessionExceptionListener implements
org.apache.qpidity.nclient.ExceptionListener
+ private class QpidSessionExceptionListener implements
org.apache.qpidity.nclient.ClosedListener
{
- public void onException(QpidException exception)
+ public void onClosed(ErrorCode errorCode, String reason)
{
synchronized (this)
{
//todo check the error code for finding out if we need to
notify the
// JMS connection exception listener
- _currentException = exception;
+ _currentException = new QpidException(reason,errorCode,null);
}
}
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java?rev=579033&r1=579032&r2=579033&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
Mon Sep 24 18:55:25 2007
@@ -25,7 +25,7 @@
{
private AtomicInteger _channelNo = new AtomicInteger();
private Connection _conn;
- private ExceptionListener _exceptionListner;
+ private ClosedListener _closedListner;
private final Lock _lock = new ReentrantLock();
/**
@@ -51,16 +51,19 @@
@Override public void connectionClose(Channel context,
ConnectionClose connectionClose)
{
- // XXX: replaced reference to _exceptionListner with
- // throw new RuntimeException because
- // _exceptionListner may be null. In general this
- // needs to be reworked because not every connection
- // close is an exception!
- throw new RuntimeException
- (new QpidException("Server closed the connection: Reason "
+
- connectionClose.getReplyText(),
-
ErrorCode.get(connectionClose.getReplyCode()),
- null));
+ ErrorCode errorCode =
ErrorCode.get(connectionClose.getReplyCode());
+ if (_closedListner == null && errorCode != ErrorCode.NO_ERROR)
+ {
+ throw new RuntimeException
+ (new QpidException("Server closed the connection:
Reason " +
+ connectionClose.getReplyText(),
+ errorCode,
+ null));
+ }
+ else
+ {
+ _closedListner.onClosed(errorCode,
connectionClose.getReplyText());
+ }
}
};
@@ -125,9 +128,9 @@
return null;
}
- public void setExceptionListener(ExceptionListener exceptionListner)
+ public void setClosedListener(ClosedListener closedListner)
{
- _exceptionListner = exceptionListner;
+ _closedListner = closedListner;
}
}
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java?rev=579033&view=auto
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java
(added)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java
Mon Sep 24 18:55:25 2007
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.nclient;
+
+import org.apache.qpidity.ErrorCode;
+
+
+/**
+ * If the communication layer detects a serious problem with a
<CODE>connection</CODE>, it
+ * informs the connection's ExceptionListener
+ */
+public interface ClosedListener
+{
+ /**
+ * If the communication layer detects a serious problem with a connection,
it
+ * informs the connection's ExceptionListener
+ * @param errorCode TODO
+ * @param reason TODO
+ *
+ * @see Connection
+ */
+ public void onClosed(ErrorCode errorCode, String reason);
+}
\ No newline at end of file
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java?rev=579033&r1=579032&r2=579033&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java
Mon Sep 24 18:55:25 2007
@@ -82,5 +82,5 @@
* @param exceptionListner The execptionListener
*/
- public void setExceptionListener(ExceptionListener exceptionListner);
+ public void setClosedListener(ClosedListener exceptionListner);
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java?rev=579033&r1=579032&r2=579033&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
Mon Sep 24 18:55:25 2007
@@ -610,5 +610,5 @@
*
* @param exceptionListner The execptionListener
*/
- public void setExceptionListener(ExceptionListener exceptionListner);
+ public void setClosedListener(ClosedListener exceptionListner);
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java?rev=579033&r1=579032&r2=579033&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
Mon Sep 24 18:55:25 2007
@@ -10,7 +10,7 @@
import org.apache.qpidity.transport.Range;
import org.apache.qpidity.transport.RangeSet;
import org.apache.qpidity.api.Message;
-import org.apache.qpidity.nclient.ExceptionListener;
+import org.apache.qpidity.nclient.ClosedListener;
import org.apache.qpidity.nclient.MessagePartListener;
/**
@@ -19,7 +19,7 @@
public class ClientSession extends org.apache.qpidity.transport.Session
implements org.apache.qpidity.nclient.DtxSession
{
private Map<String,MessagePartListener> _messageListeners = new
HashMap<String,MessagePartListener>();
- private ExceptionListener _exceptionListner;
+ private ClosedListener _exceptionListner;
private RangeSet _acquiredMessages;
private RangeSet _rejectedMessages;
@@ -92,7 +92,7 @@
_messageListeners.put(destination, listener);
}
- public void setExceptionListener(ExceptionListener exceptionListner)
+ public void setClosedListener(ClosedListener exceptionListner)
{
_exceptionListner = exceptionListner;
}
@@ -109,7 +109,7 @@
void notifyException(QpidException ex)
{
- _exceptionListner.onException(ex);
+ _exceptionListner.onClosed(null, null);
}
Map<String,MessagePartListener> getMessageListerners()
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java?rev=579033&r1=579032&r2=579033&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java
Mon Sep 24 18:55:25 2007
@@ -1,10 +1,10 @@
package org.apache.qpidity.nclient.impl;
-import org.apache.qpidity.QpidException;
+import org.apache.qpidity.ErrorCode;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.nclient.Client;
import org.apache.qpidity.nclient.Connection;
-import org.apache.qpidity.nclient.ExceptionListener;
+import org.apache.qpidity.nclient.ClosedListener;
import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.nclient.util.MessageListener;
import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
@@ -24,7 +24,7 @@
System.out.println(m.toString());
System.out.println("================== End Msg
==================\n");
}
-
+
});
}
@@ -36,19 +36,19 @@
}catch(Exception e){
e.printStackTrace();
}
-
+
Session ssn = conn.createSession(50000);
- ssn.setExceptionListener(new ExceptionListener()
+ ssn.setClosedListener(new ClosedListener()
{
- public void onException(QpidException e)
+ public void onClosed(ErrorCode errorCode, String reason)
{
- System.out.println(e);
+ System.out.println("ErrorCode : " + errorCode + "
reason : " + reason);
}
});
ssn.queueDeclare("queue1", null, null);
ssn.queueBind("queue1", "amq.direct", "queue1",null);
ssn.sync();
-
+
ssn.messageSubscribe("queue1", "myDest", (short)0,
(short)0,createAdapter(), null);
// queue
@@ -63,21 +63,21 @@
ssn.header(new DeliveryProperties().setRoutingKey("stocks"));
ssn.endData();
ssn.sync();
-
+
// topic subs
ssn.messageSubscribe("topic1", "myDest2", (short)0,
(short)0,createAdapter(), null);
ssn.messageSubscribe("topic2", "myDest3", (short)0,
(short)0,createAdapter(), null);
ssn.messageSubscribe("topic3", "myDest4", (short)0,
(short)0,createAdapter(), null);
ssn.sync();
-
+
ssn.queueDeclare("topic1", null, null);
- ssn.queueBind("topic1", "amq.topic", "stock.*",null);
+ ssn.queueBind("topic1", "amq.topic", "stock.*",null);
ssn.queueDeclare("topic2", null, null);
ssn.queueBind("topic2", "amq.topic", "stock.us.*",null);
ssn.queueDeclare("topic3", null, null);
ssn.queueBind("topic3", "amq.topic", "stock.us.rh",null);
ssn.sync();
-
+
// topic
ssn.messageTransfer("amq.topic", (short) 0, (short) 1);
ssn.data("Topic message");
@@ -85,5 +85,5 @@
ssn.endData();
ssn.sync();
}
-
+
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java?rev=579033&r1=579032&r2=579033&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java
Mon Sep 24 18:55:25 2007
@@ -2,11 +2,11 @@
import java.io.FileInputStream;
-import org.apache.qpidity.QpidException;
+import org.apache.qpidity.ErrorCode;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.nclient.Client;
import org.apache.qpidity.nclient.Connection;
-import org.apache.qpidity.nclient.ExceptionListener;
+import org.apache.qpidity.nclient.ClosedListener;
import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.nclient.util.FileMessage;
import org.apache.qpidity.nclient.util.MessageListener;
@@ -27,7 +27,7 @@
System.out.println(m.toString());
System.out.println("================== End Msg
==================\n");
}
-
+
});
}
@@ -39,19 +39,19 @@
}catch(Exception e){
e.printStackTrace();
}
-
+
Session ssn = conn.createSession(50000);
- ssn.setExceptionListener(new ExceptionListener()
+ ssn.setClosedListener(new ClosedListener()
{
- public void onException(QpidException e)
+ public void onClosed(ErrorCode errorCode, String reason)
{
- System.out.println(e);
+ System.out.println("ErrorCode : " + errorCode + "
reason : " + reason);
}
});
ssn.queueDeclare("queue1", null, null);
ssn.queueBind("queue1", "amq.direct", "queue1",null);
ssn.sync();
-
+
ssn.messageSubscribe("queue1", "myDest", (short)0,
(short)0,createAdapter(), null);
try
@@ -60,7 +60,7 @@
1024,
new
DeliveryProperties().setRoutingKey("queue1"),
new
MessageProperties().setMessageId("123"));
-
+
// queue
ssn.messageStream("amq.direct",msg, (short) 0, (short) 1);
ssn.sync();
@@ -70,5 +70,5 @@
e.printStackTrace();
}
}
-
+
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java?rev=579033&r1=579032&r2=579033&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java
Mon Sep 24 18:55:25 2007
@@ -1,19 +1,18 @@
package org.apache.qpidity.nclient.interop;
+import org.apache.qpidity.ErrorCode;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.nclient.Client;
import org.apache.qpidity.nclient.Connection;
-import org.apache.qpidity.nclient.ExceptionListener;
+import org.apache.qpidity.nclient.ClosedListener;
import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.nclient.util.MessageListener;
import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
import org.apache.qpidity.transport.DeliveryProperties;
-import org.apache.qpidity.transport.ExchangeQueryResult;
-import org.apache.qpidity.transport.Future;
import org.apache.qpidity.transport.RangeSet;
-public class BasicInteropTest implements ExceptionListener
+public class BasicInteropTest implements ClosedListener
{
private Session session;
@@ -103,6 +102,7 @@
session.messageFlowMode("myDest", Session.MESSAGE_FLOW_MODE_WINDOW);
System.out.println("------- Setting Credit --------");
session.messageFlow("myDest", Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+ //session.messageFlow("myDest", Session.MESSAGE_FLOW_UNIT_BYTE,
0xFFFFFFFF);
session.messageFlow("myDest", Session.MESSAGE_FLOW_UNIT_BYTE, -1);
}
@@ -112,17 +112,17 @@
session.sync();
}
- public void onException(QpidException e)
+ public void onClosed(ErrorCode errorCode, String reason)
{
System.out.println("------- Broker Notified an error --------");
- System.out.println("------- " + e.getErrorCode() + " --------");
- System.out.println("------- " + e.getMessage() + " --------");
+ System.out.println("------- " + errorCode + " --------");
+ System.out.println("------- " + reason + " --------");
System.out.println("------- /Broker Notified an error --------");
}
public static void main(String[] args) throws QpidException
{
- String host = "0.0.0.0";
+ /*String host = "0.0.0.0";
if (args.length>0)
{
host = args[0];
@@ -137,5 +137,8 @@
t.testSendMessage();
t.testMessageFlush();
t.close();
+ */
+
+ System.out.print(Integer.toHexString(-1));
}
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java?rev=579033&r1=579032&r2=579033&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java
Mon Sep 24 18:55:25 2007
@@ -5,9 +5,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,6 +20,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpidity.njms.message.*;
+import org.apache.qpidity.ErrorCode;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.transport.RangeSet;
@@ -155,7 +156,7 @@
// create the qpid session with an expiry <= 0 so that the session
does not expire
_qpidSession = _connection.getQpidConnection().createSession(0);
// set the exception listnere for this session
- _qpidSession.setExceptionListener(new QpidSessionExceptionListener());
+ _qpidSession.setClosedListener(new QpidSessionExceptionListener());
// set transacted if required
if (_transacted && !isXA)
{
@@ -468,7 +469,7 @@
RangeSet ranges = new RangeSet();
for (QpidMessage message : _unacknowledgedMessages)
{
- // release this message
+ // release this message
ranges.add(message.getMessageTransferId());
}
getQpidSession().messageRelease(ranges);
@@ -1160,15 +1161,15 @@
/**
* Lstener for qpid protocol exceptions
*/
- private class QpidSessionExceptionListener implements
org.apache.qpidity.nclient.ExceptionListener
+ private class QpidSessionExceptionListener implements
org.apache.qpidity.nclient.ClosedListener
{
- public void onException(QpidException exception)
+ public void onClosed(ErrorCode errorCode, String reason)
{
synchronized (this)
{
//todo check the error code for finding out if we need to
notify the
// JMS connection exception listener
- _currentException = exception;
+ _currentException = new QpidException(reason,errorCode,null);
}
}
}