I think you need to step back and establish the reliability guarantees
you need first. On the one hand you are looking for exceptions to be
indicated per send, and on the other you are forcing async sends via
the synchronous send method, which doesnt guarantee reliability at its
most basic level.

It feels more like you should be using synchronous sends, or if you
need additional perf and cant use multiple producers to get it, then
batching asynchronous sends with the completion listener and actually
monitoring the results.

On 8 June 2018 at 12:24, akabhishek1 <mailbox.abhishek.ku...@gmail.com> wrote:
> Hi Robbie,
>
> Thanks a lot for your reply and clarification. You are right for threading
> issue on "onException()". I am still doing R&D with your suggestion, i will
> come back to you shortly with results.
>
> Today morning at 10:55AM(UTC+1), i have taken master code and installed
> locally "0.33.0-SNAPSHOT" version.
>
> While doing research, i found one issue related with message loss and not
> exposing error to application. Which i think, you need to take a look before
> releasing of "0.33.0" version.
>
> Issue - Message are getting loss while connection drop and errors are not
> bubbling up at application level for failover connection.
>
> Steps to reproduce issue(Note - sending message asynchronously without
> CompletionListener with failover connection)
> 1. Change valid QueueName, SBUS_NAME, USERNAME and PASSWORD in below
> example.
> 2. Run java application
> 3. Every 5s("due to Thread.sleep(5000)") one message will publish at
> ServiceBus.
> 4. Disconnect Lan cable/internet after publishing 1 or 2 message. So there
> is no connection between ServiceBus from your desktop/Laptop.
> 5. Wait for 3-4 minute, you can see that, “for loop” continue sending for
> next 4 message.
>    After sending 4 message, for loop will be in suspended/wait status
> (thread will be in suspended/wait status). So no next processing is going
> on.
>
> 6. Wait for 5 minute
>
> 7. After that connect cable/internet. So After connecting to internet, for
> loop will start sending message(thread resumed also)
> 8. You can wait for 1-2 minute until you see message "**** All message sent
> successfully ****"
> 9. At the end of processing, we are expecting 30 message should be available
> on Queue.
>    ISSUE - But we can see only 26 messages on queue with no exception at
> application level.
>            SO we are having loss of 4 messages. I waited upto 30m but still
> not received loss messages at Queue.
>
> NOTE: - I have also tested without connection failover. I am getting
> exception on "onException()" but having loss of 4 message. I think we can’t
> do anything here because we don’t have failover.
>
> Could you please take a look on this issue and let me know for any queries.
>
> //// CODE ////
> package org.test;
>
> import java.util.Hashtable;
>
> import javax.jms.CompletionListener;
> import javax.jms.Connection;
> import javax.jms.ConnectionFactory;
> import javax.jms.Destination;
> import javax.jms.ExceptionListener;
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.MessageProducer;
> import javax.jms.Session;
> import javax.jms.TextMessage;
> import javax.naming.Context;
> import javax.naming.InitialContext;
> import javax.naming.NamingException;
>
> import org.mule.transport.servicebus.exception.ServicebusSendException;
>
> public class TestQpidSend implements ExceptionListener{
>
>         private static final String QUEUE_NAME = "XXXXXXXX";
>         private static final String SBUS_NAME = "XXXXXXXX";
>         private static final String USERNAME = "XXXXXXXX";
>         private static final String PASSWORD = "XXXXXXXX";
>         private static final String QPID_CONNECTION_FACTORY_CLASS =
> "org.apache.qpid.jms.jndi.JmsInitialContextFactory";
>
>         SendCompletionHandler sendCompletionHandler = new 
> SendCompletionHandler();
>
>         public static void main(String[] args) throws Exception {
>                 TestQpidSend test = new TestQpidSend();
>                 test.send();
>
>                 //Loop for not terminating the application
>                 int i = 0;
>                 while(i > 0){
>                         i++;
>                 }
>         }
>
>         /* Enable constructor, if you are running in any logger for server 
> logs */
>
>          /*public TestQpidRcvr() throws NamingException, JMSException {
>                 System.err.println("***** I am in ****");
>                 startListning();
>         }*/
>
>         private void send() throws NamingException, JMSException,
> InterruptedException {
>                 Hashtable<String, String> hashtable = new Hashtable<>();
>                 hashtable.put("connectionfactory.SBCF", "failover:(amqps://"+ 
> SBUS_NAME
> +".servicebus.windows.net?transport.tcpKeepAlive=true&amqp.traceFrames=true)?failover.reconnectDelay=2000&failover.maxReconnectAttempts=-1&failover.warnAfterReconnectAttempts=10&failover.startupMaxReconnectAttempts=3&jms.prefetchPolicy.all=1000&jms.forceAsyncSend=true");
>
>                 //hashtable.put("connectionfactory.SBCF", "amqps://"+ 
> SBUS_NAME
> +".servicebus.windows.net?transport.tcpKeepAlive=true&amqp.traceFrames=true&jms.prefetchPolicy.all=1000&jms.forceAsyncSend=true");
>
>                 hashtable.put(Context.INITIAL_CONTEXT_FACTORY,
> QPID_CONNECTION_FACTORY_CLASS);
>
>                 Context context = new InitialContext(hashtable);
>                 ConnectionFactory connectionFactory = (ConnectionFactory)
> context.lookup("SBCF");
>
>                 Connection connection = 
> connectionFactory.createConnection(USERNAME,
> PASSWORD);
>                 connection.setExceptionListener(this); // Settted 
> ExceptionListener
>                 connection.start();
>
>                 Session session = connection.createSession(false,
> Session.CLIENT_ACKNOWLEDGE);
>
>                 System.out.println("createSession :: " + session);
>
>                 Destination destination = session.createQueue(QUEUE_NAME);
>                 //System.out.println("**** Destination created ****");
>
>                 MessageProducer messageProducer = 
> session.createProducer(destination);
>
>
>
>                 for(int i=0; i < 30; i++){
>
>                         TextMessage textMessage = 
> session.createTextMessage("Hello");
>
>                         System.out.println("**** sending ****");
>
>                         messageProducer.send(textMessage);
>
>                         System.out.println("**** sent ****");
>
>                         Thread.sleep(5000);
>                 }
>
>                 System.out.println("**** All message sent successfully ****");
>         }
>
>
>         @Override
>         public void onException(JMSException exception) {
>                 System.err.println("*** onException :: connection exception 
> message ::
> ***" + exception.getMessage());
>                 exception.printStackTrace();
>
>         }
>
> }
>
>
>
>
> --
> Sent from: http://qpid.2158936.n2.nabble.com/Apache-Qpid-users-f2158936.html
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
> For additional commands, e-mail: users-h...@qpid.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
For additional commands, e-mail: users-h...@qpid.apache.org

Reply via email to