Firing the ExceptionListener for the connection when a consumer with a MessageListener is remotely closed is something that was added and tested only from 0.32.0 onward. Its working under test but its possible in other scenarios somethings preventing it happening in certain cases, it would need looking at.
Random aside, I find their use of "amqp:link:detach-forced" a bit odd in the case you've described given its seeming suitability for the "amqp:resource-deleted" error. On 1 June 2018 at 18:44, akabhishek1 <[email protected]> wrote: > Hi Robbie, > > Thank you so much for your analysis. I can confirm that there is some issue > in Qpid library. I am able to reproduced this issue. > > Please find below steps to reproduce this issue > > 1. Change value of "SBUS_NAME, USERNAME, PASSWORD" in below class > 2. Run the below classe > 3. Send one message on queue to verify the listener is working fine. If > running fine then > 4. Delete the Queue from ServiceBus - this will enforce to detach the link. > > If you delete the queue then we can see AMQP detach error but not error from > exceptionListener. > We can see that, there are no issues pop up in "onException()" operation. if > issue does not pop up then we can't re-register listener. Please find below > error log. > > Please suggest your opinion and correct me if i am doing any mistake. > > Error Log > > TRACE 2018-06-01 18:15:38,123 [AmqpProvider > :(1):[amqps://XXXXXXX.servicebus.windows.net:-1]] > org.apache.qpid.jms.provider.amqp.FRAMES: RECV: Detach{handle=0, > closed=true, error=Error{condition=amqp:link:detach-forced, description='The > link > 'G2:71211:qpid-jms:receiver:ID:51428778-b922-4612-9d58-1343134d388a:1:1:1:test-rcvr' > is force detached by the broker due to errors occurred in consumer(link40). > Detach origin: InnerMessageReceiver was closed. > TrackingId:d1fcb94d000001bf000000285b117efe_G2_B3, > SystemTracker:XXXXXXX:Queue:test-rcvr, Timestamp:6/1/2018 5:15:38 PM', > info=null}} > TRACE 2018-06-01 18:15:38,123 [AmqpProvider > :(1):[amqps://XXXXXXX.servicebus.windows.net:-1]] > org.apache.qpid.jms.provider.amqp.FRAMES: SENT: Detach{handle=0, > closed=true, error=null} > > > /////// Sample Code /////// > package com.service.connector.example.test; > > import java.util.Hashtable; > > import javax.jms.Connection; > import javax.jms.ConnectionFactory; > import javax.jms.Destination; > import javax.jms.ExceptionListener; > import javax.jms.JMSException; > import javax.jms.MessageConsumer; > import javax.jms.MessageListener; > import javax.jms.Session; > import javax.naming.Context; > import javax.naming.InitialContext; > import javax.naming.NamingException; > > public class TestQpidRcvr implements MessageListener, ExceptionListener{ > > private static final String SBUS_NAME = "XXXXXXX"; > private static final String USERNAME = "XXXXXXX"; > private static final String PASSWORD = "XXXXXXX"; > private static final String QPID_CONNECTION_FACTORY_CLASS = > "org.apache.qpid.jms.jndi.JmsInitialContextFactory"; > > public static void main(String[] args) throws Exception { > TestQpidRcvr test = new TestQpidRcvr(); > test.startListning(); > > //Loop for not terminating the application > int i = 0; > while(i > 0){ > i++; > } > } > > /* Enable constructor, if you are running in any logger for server > logs > > public TestQpidRcvr() throws NamingException, JMSException { > System.err.println("***** I am in ****"); > startListning(); > }*/ > > private void startListning() throws NamingException, JMSException { > Hashtable<String, String> hashtable = new Hashtable<>(); > hashtable.put("connectionfactory.SBCF", "failover:(amqps://"+ > SBUS_NAME > +".servicebus.windows.net?transport.tcpKeepAlive=true&amqp.traceFrames=true)?failover.reconnectDelay=2000&failover.maxReconnectAttempts=-1&failover.warnAfterReconnectAttempts=10&failover.startupMaxReconnectAttempts=3&jms.prefetchPolicy.all=1000&jms.forceAsyncSend=true"); > > hashtable.put(Context.INITIAL_CONTEXT_FACTORY, > QPID_CONNECTION_FACTORY_CLASS); > > Context context = new InitialContext(hashtable); > ConnectionFactory connectionFactory = (ConnectionFactory) > context.lookup("SBCF"); > > Connection connection = > connectionFactory.createConnection(USERNAME, > PASSWORD); > connection.setExceptionListener(this); // Settted > ExceptionListener > connection.start(); > > Session session = connection.createSession(false, > Session.CLIENT_ACKNOWLEDGE); > > System.out.println("createSession :: " + session); > System.out.println("**** I am connected ****"); > > Destination destination = session.createQueue("test-rcvr"); > System.out.println("**** Destination created ****"); > > MessageConsumer consumer = > session.createConsumer(destination); > > consumer.setMessageListener(this); > > System.out.println("**** Published successfully ****"); > } > > > @Override > public void onMessage(javax.jms.Message message) { > try { > System.out.println("Received Message :: " + > message.getJMSMessageID()); > message.acknowledge(); > } catch (JMSException e) { > e.printStackTrace(); > } > } > > @Override > public void onException(JMSException exception) { > System.err.println("*** onException :: exception message :: > ***" + > exception.getMessage()); > exception.printStackTrace(); > > } > > } > > > > > -- > Sent from: http://qpid.2158936.n2.nabble.com/Apache-Qpid-users-f2158936.html > > --------------------------------------------------------------------- > To unsubscribe, e-mail: [email protected] > For additional commands, e-mail: [email protected] > --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
