Now I have to rescind my previous comments.  It runs out NON_PERSISTENT
messages from clients or the browser console are forwarded and consumed
correctly, but PERSISTENT messages are not transmitted correctly.  This is
true whether I send from the console, from JAVA using the JMS/OpenWire
client or from Javascript rhea/AMQP.  I have turned on TRACE logging on the
servers, but do not see any hints.

Any suggestions?  I saw that Herbert suggested I had only set up two
parallel brokers, but I followed the instructions at
https://activemq.apache.org/components/classic/documentation/shared-file-system-master-slave




On Mon, 26 Feb 2024 at 10:00, Bruce Cooper <[email protected]> wrote:

> As a follow up, I can now confirm that using a different client (an AMQP
> client written in javascript using the rhea library) works as expected.  It
> is just this Java process that is broken.  Any assistance you can provide
> would be greatly appreciated.
>
> On Mon, 26 Feb 2024 at 09:21, Bruce Cooper <[email protected]>
> wrote:
>
>> Hi Marco,
>>
>> Thanks for your response.  I agree it is strange.  I now have some more
>> information to share.  It now looks like it has something to do with my
>> message producer, rather than core ActiveMQ behaviour.
>>
>> If I send messages using the ActiveMQ console on Broker A, they are
>> transmitted and received correctly by the receiver.  If I send messages
>> using the attached program, it only works until the failover happens.  What
>> is super strange is that the producer isn't connecting to the failover pair
>> (B and C).  It is connected to the satellite (A).  Furthermore, that
>> process is ephemeral.  It is restarted each time a send is made.
>>
>> My test is now a bit different than before. I first setup the brokers,
>> and initiate a "failover"
>>
>>    1. Start Brokers B and C configured to use the same data directory so
>>    that they are in a failover configuration.  Note that B is acting as the
>>    primary, and C is in standby
>>    2. Start Broker A, configured to point to Brokers B and C via the
>>    connection string static:failover:(tcp://100.127.41.128:61616,tcp://
>>    100.127.41.128:61617). Note that it connects to B
>>    3. Start consumer to consume from Brokers B and C using the same
>>    failover connection string. Note that it connects to B (in this example)
>>    4. Start producer to send to broker A.  Note that messages are
>>    received by the consumer.  Repeat to confirm that this continues to 
>> operate.
>>    5. Send a message using the console, and note that this also works.
>>    6. Stop Broker B.  Wait for both Broker A and the consumer to
>>    reconnect to C, once it has obtained its locks and started.
>>
>> Once failover has occurred, proving that sending from the console still
>> works:
>>
>>    1. Connect all processes together, do a failover, and wait for
>>    reconnection
>>    2. Send message in Broker A's Console, on the queue page
>>    3. Message is received by consumer
>>    4. See Enqueued and Dequeued metric go up (after a browser refresh) -
>>    This indicates that the message has been dequeued by the network 
>> connection
>>    and forwarded on to the HA pair.
>>    5. Check the console on Broker C and note that its enqueue and
>>    dequeue count have incremented.
>>
>> The above shows everything works as expected, now let us try my producer
>> program
>>
>>    1. Run the main program below with argument "send" - This connects to
>>    the satellite broker (Broker A) and publishes a message
>>    2. Note that no message is received by the consumer connected to C
>>    (once the failover has occurred)
>>    3. Note that Broker A's console queue page again, and see that the
>>    Enqueued and Dequeued metric have incremented as before.
>>    4. Check the broker C queue console page, and note that this time the
>>    Enqueued and Dequeued metric have _not_ been incremented.  It is as if A
>>    thinks it has sent the message, but the pair C never receives it.
>>
>> I feel like I must have misconfigured my producer code in some fashion,
>> but it was simply based on examples I found.  Is there something obvious I
>> have missed?
>>
>>
>> package au.com.mechination.integ.test;
>>
>> import java.util.Date;
>>
>> import javax.jms.Connection;
>> import javax.jms.DeliveryMode;
>> import javax.jms.Destination;
>> import javax.jms.JMSException;
>> import javax.jms.Message;
>> import javax.jms.MessageConsumer;
>> import javax.jms.MessageProducer;
>> import javax.jms.Session;
>> import javax.jms.TextMessage;
>>
>> import org.apache.activemq.ActiveMQConnectionFactory;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>> /**
>> * Hello world!
>> *
>> */
>> public class Main {
>>     Connection conn = null;
>>     ActiveMQConnectionFactory connectionFactory;
>>     private Session session;
>>
>>     static final Logger logger = LoggerFactory.getLogger(Main.class);
>>
>>     public static void main(String[] args) throws JMSException {
>>         Main doer = new Main();
>>
>>         if (args.length > 0 && args[0].equals("send")) {
>>             doer.send();
>>         } else {
>>             doer.receive();
>>         }
>>     }
>>
>>     public void connect(String url, String username, String password)
>> throws JMSException {
>>         final ActiveMQConnectionFactory connectionFactory = new
>> ActiveMQConnectionFactory(url);
>>
>>         // Pass the sign-in credentials.
>>         connectionFactory.setUserName(username);
>>         connectionFactory.setPassword(password);
>>
>>         // Establish a connection for the producer.
>>         this.conn = connectionFactory.createConnection();
>>         conn.start();
>>
>>         session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
>>     }
>>
>>     void send() throws JMSException {
>>         String url = "tcp://localhost:61616";
>>         String username = "bruce";
>>         String password = "sekrit";
>>
>>         System.out.println("Sender connecting to " + url);
>>         connect(url, username, password);
>>
>>         // Create a queue named "MyQueue".
>>         final Destination producerDestination = session.createQueue(
>> "TestQueue");
>>
>>         // Create a producer from the session to the queue.
>>         final MessageProducer producer = session.createProducer
>> (producerDestination);
>>         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
>>         System.out.println("TTL is " + producer.getTimeToLive()); //
>> Prints 0 - Unlimited
>>
>>         for (int i = 0; i < 1; i++) {
>>             String text = "Hello from Amazon MQ!: " + i + " " + new Date
>> ().toInstant().toString();
>>             System.out.println("Sending message " + text);
>>             TextMessage producerMessage = session.createTextMessage
>> (text);
>>             producer.send(producerMessage);
>>         }
>>
>>         producer.close();
>>
>>         System.out.println("Send completed");
>>         System.exit(0);
>>     }
>>
>>     void receive() throws JMSException {
>>         String url = "failover:(tcp://100.127.41.128:61616,tcp://
>> 100.127.41.128:61617)";
>>         String username = "bruce";
>>         String password = "sekrit";
>>
>>         System.out.println("Receiver connecting to " + url);
>>         connect(url, username, password);
>>
>>         final Destination consumerQueue = session.createQueue("TestQueue"
>> );
>>         final MessageConsumer consumer = session.createConsumer
>> (consumerQueue);
>>
>>         while (true) {
>>             final Message msg = consumer.receive(2500);
>>             if (msg == null) {
>>                 System.out.print(".");
>>             } else if (msg instanceof TextMessage) {
>>                 System.out.println("Received msg " + ((TextMessage) msg).
>> getText());
>>             } else {
>>                 System.err.println("Something unexpected received");
>>             }
>>         }
>>     }
>> }
>>
>>
>>
>>
>>
>
> --
> https://mechination.com.au/
> Ph: 0448 341 729
>


-- 
https://mechination.com.au/
Ph: 0448 341 729

Reply via email to