Hi Robbie,
Thank you so much for your analysis. I can confirm that there is some issue
in Qpid library. I am able to reproduced this issue.
Please find below steps to reproduce this issue
1. Change value of "SBUS_NAME, USERNAME, PASSWORD" in below class
2. Run the below classe
3. Send one message on queue to verify the listener is working fine. If
running fine then
4. Delete the Queue from ServiceBus - this will enforce to detach the link.
If you delete the queue then we can see AMQP detach error but not error from
exceptionListener.
We can see that, there are no issues pop up in "onException()" operation. if
issue does not pop up then we can't re-register listener. Please find below
error log.
Please suggest your opinion and correct me if i am doing any mistake.
Error Log
TRACE 2018-06-01 18:15:38,123 [AmqpProvider
:(1):[amqps://XXXXXXX.servicebus.windows.net:-1]]
org.apache.qpid.jms.provider.amqp.FRAMES: RECV: Detach{handle=0,
closed=true, error=Error{condition=amqp:link:detach-forced, description='The
link
'G2:71211:qpid-jms:receiver:ID:51428778-b922-4612-9d58-1343134d388a:1:1:1:test-rcvr'
is force detached by the broker due to errors occurred in consumer(link40).
Detach origin: InnerMessageReceiver was closed.
TrackingId:d1fcb94d000001bf000000285b117efe_G2_B3,
SystemTracker:XXXXXXX:Queue:test-rcvr, Timestamp:6/1/2018 5:15:38 PM',
info=null}}
TRACE 2018-06-01 18:15:38,123 [AmqpProvider
:(1):[amqps://XXXXXXX.servicebus.windows.net:-1]]
org.apache.qpid.jms.provider.amqp.FRAMES: SENT: Detach{handle=0,
closed=true, error=null}
/////// Sample Code ///////
package com.service.connector.example.test;
import java.util.Hashtable;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class TestQpidRcvr implements MessageListener, ExceptionListener{
private static final String SBUS_NAME = "XXXXXXX";
private static final String USERNAME = "XXXXXXX";
private static final String PASSWORD = "XXXXXXX";
private static final String QPID_CONNECTION_FACTORY_CLASS =
"org.apache.qpid.jms.jndi.JmsInitialContextFactory";
public static void main(String[] args) throws Exception {
TestQpidRcvr test = new TestQpidRcvr();
test.startListning();
//Loop for not terminating the application
int i = 0;
while(i > 0){
i++;
}
}
/* Enable constructor, if you are running in any logger for server logs
public TestQpidRcvr() throws NamingException, JMSException {
System.err.println("***** I am in ****");
startListning();
}*/
private void startListning() throws NamingException, JMSException {
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "failover:(amqps://"+
SBUS_NAME
+".servicebus.windows.net?transport.tcpKeepAlive=true&amqp.traceFrames=true)?failover.reconnectDelay=2000&failover.maxReconnectAttempts=-1&failover.warnAfterReconnectAttempts=10&failover.startupMaxReconnectAttempts=3&jms.prefetchPolicy.all=1000&jms.forceAsyncSend=true");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY,
QPID_CONNECTION_FACTORY_CLASS);
Context context = new InitialContext(hashtable);
ConnectionFactory connectionFactory = (ConnectionFactory)
context.lookup("SBCF");
Connection connection =
connectionFactory.createConnection(USERNAME,
PASSWORD);
connection.setExceptionListener(this); // Settted
ExceptionListener
connection.start();
Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
System.out.println("createSession :: " + session);
System.out.println("**** I am connected ****");
Destination destination = session.createQueue("test-rcvr");
System.out.println("**** Destination created ****");
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(this);
System.out.println("**** Published successfully ****");
}
@Override
public void onMessage(javax.jms.Message message) {
try {
System.out.println("Received Message :: " +
message.getJMSMessageID());
message.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
}
}
@Override
public void onException(JMSException exception) {
System.err.println("*** onException :: exception message ::
***" +
exception.getMessage());
exception.printStackTrace();
}
}
--
Sent from: http://qpid.2158936.n2.nabble.com/Apache-Qpid-users-f2158936.html
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]