Hi,
Testing QPid JMS (AMQP v1.0) with Azure Service Bus (SB).
Trying to get durable subscriptions to work.
I have created a topic in SB
When starting my test app I get:
javax.jms.JMSException: The messaging entity
'mynamespace:topic:test.topic~15|test1' could not be found.
TrackingId:12ecc2a3-f8f3-42a3-8bd5-ad5d9823c367_B20,
SystemTracker:jjarkebo:topic:test.topic~15|test1, Timestamp:8/7/2017
1:23:34 PM TrackingId:b1af76b2b7a44b95ad2bd0e01f406507_G20,
SystemTracker:gateway6, Timestamp:8/7/2017 1:23:34 PM
at
org.apache.qpid.amqp_1_0.jms.impl.TopicSubscriberImpl.createClientReceiver(TopicSubscriberImpl.java:111)
at
org.apache.qpid.amqp_1_0.jms.impl.MessageConsumerImpl.<init>(MessageConsumerImpl.java:129)
at
org.apache.qpid.amqp_1_0.jms.impl.TopicSubscriberImpl.<init>(TopicSubscriberImpl.java:46)
at
org.apache.qpid.amqp_1_0.jms.impl.SessionImpl.createDurableSubscriber(SessionImpl.java:544)
at
org.apache.qpid.amqp_1_0.jms.impl.SessionImpl.createDurableSubscriber(SessionImpl.java:512)
at
org.apache.qpid.amqp_1_0.jms.impl.SessionImpl.createDurableSubscriber(SessionImpl.java:59)
at com.vcrs.test.AmqpJmsSubscriberApp.<init>(AmqpJmsSubscriberApp.java:39)
at com.vcrs.test.AmqpJmsSubscriberApp.main(AmqpJmsSubscriberApp.java:67)
Any ideas why this happen?
Do you have a working example?
My test app code below
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import org.apache.qpid.amqp_1_0.jms.Connection;
import org.apache.qpid.amqp_1_0.jms.ConnectionFactory;
import org.apache.qpid.amqp_1_0.jms.Destination;
import org.apache.qpid.amqp_1_0.jms.MessageConsumer;
import org.apache.qpid.amqp_1_0.jms.MessageProducer;
import org.apache.qpid.amqp_1_0.jms.Session;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Hashtable;
import java.util.Random;
public class AmqpJmsSubscriberApp {
private Connection connection;
private Session subscriberSession;
private MessageConsumer subscriber;
public AmqpJmsSubscriberApp() throws Exception {
Hashtable<String, String> env = new Hashtable<String, String>();
env.put(Context.INITIAL_CONTEXT_FACTORY,
"org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory");
env.put("connectionfactory.SBCF", "amqps://
MyUser:[email protected]:5671");
env.put("topic.TOPIC", "test.topic");
Context context = new InitialContext(env);
ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Topic topic = (Topic) context.lookup("TOPIC");
connection = cf.createConnection();
subscriberSession = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
subscriber = subscriberSession.createDurableSubscriber(topic,
"test1");
MessageListener messagelistener = new MessageListener()
{
public void onMessage(Message message)
{
try {
BytesMessage byteMsg = (BytesMessage)message;
byte[] byteArr = new byte[(int)byteMsg.getBodyLength()];
byteMsg.readBytes(byteArr);
String msg = new String(byteArr);
System.out.println("Received message with JMSMessageID =
" + message.getJMSMessageID());
System.out.println("You said " + msg);
message.acknowledge();
} catch (Exception e) {
e.printStackTrace();
}
}
};
subscriber.setMessageListener( messagelistener );
connection.start();
}
public static void main(String[] args) {
try {
AmqpJmsSubscriberApp simpleSubscriber = new AmqpJmsSubscriberApp();
System.out.println("Press [enter] to send a message. Type
'exit' + [enter] to quit.");
BufferedReader commandLine = new java.io.BufferedReader(new
InputStreamReader(System.in));
while (true) {
String s = commandLine.readLine();
if (s.equalsIgnoreCase("exit")) {
simpleSubscriber.close();
System.exit(0);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void close() throws JMSException {
connection.close();
}
}