Thanks for all the help. I'm now able to have a persistent queue using Derby. Is it more advantageous in a production environment to use BDB instead?
I found details on queue sizes. I see you can specify the max size and count of a queue, but what are the defaults if you don't specify? Will it just keep queuing them up forever if a consumer never attaches? On 15 May 2012 19:18, Robbie Gemmell <[email protected]> wrote: > Out of interest, can I ask what client/broker versions you are using? > It would be useful to target advice accordingly, although my first > suggestion is that you switch to using the 0.16 release candidate (its > under vote and well on course to being declared the final) if you > arent already, currently available at: > http://people.apache.org/~jross/qpid-0.16-rc4/ > > On 15 May 2012 21:05, Gary Ogden <[email protected]> wrote: > > So I did some more reading and I now understand why the messages weren't > > being retained. So I need to create a queue, and have that queue created > in > > the virtualhosts file so that the broker starts it up. But even after > doing > > that, I still have the issue where the messages are getting dropped until > > the consumer starts. > > > > The isue here is that you are sending messages to an exchange with a > given routing key, but that key (which is typically the queue name, or > topic name/subject when using the JMS client) has not yet been used as > a binding key to bind a queue to the exchange. In this case the > exchange is unable to route the messages anywhere, and has thus > discarded them (which is logged). When you connect your client, it > must then be causing an appropriate binding to be eastblished. I can > think of 1 bug we resolved some time ago (but it may only now be > included as part of 0.16) that would cause such annoyance when using > queues in the configuration file in this regard, causing them not to > be bound to the nameless 'default' exchange with their name, which is > the binding used by the clients default addressing syntax when > 'sending to Queues'. However, I notice your producer code below > appears to be sending to the amq.direct exchange so it may not be > related here. > > As an aside, you can create queues in the configuration files, through > the brokers JMX management interface, or using message clients > themselves. > > > Here's my VH file: > > > > <virtualhost> > > <name>blah</name> > > <blah> > > <queue> > > <name>blahqueue</name> > > <blahqueue> > > <exchange>amq.direct</exchange> > > <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> > > <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> > > <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> > > </blahqueue> > > </queue> > > </blah> > > </virtualhost> > > > > If this is truly all you have in there, your peristent messages will > not be retained when you restart because it defaults to the > MemoryMessageStore. There are two persistent store implementations, > one based on Derby and an optional one based on BerkeleyDB Java > Edition (which is available under the Sleepycat licence, which isnt > compatible with the Apache Licence). You can configure their on a > per-virtualhost basis use by inserting one of the following into a > given virtualhosts configuraton (see > > http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/etc/virtualhosts.xml?revision=1195211&view=co > for context of a full config file), substituting the placeholders I > added: > > MemoryMessageStore: > <store> > <class>org.apache.qpid.server.store.MemoryMessageStore</class> > </store> > > DerbyMessageStore: > <store> > <class>org.apache.qpid.server.store.DerbyMessageStore</class> > <environment-path>PATH.TO.STORE.FILES/derbystore/VHOST.NAME > </environment-path> > </store> > > <store> > <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class> > <environment-path>PATH.TO.STORE.FILES/bdbstore/VHOST.NAME > </environment-path> > </store> > > In order to use the last one, you would also need to download BDB JE > from the Oracle website, and place the je-X.Y.Z.jar file into a > <broker.install>/lib/opt/ directory where it will then get picked up. > > > Here's my properties file: > > > > java.naming.factory.initial = > > org.apache.qpid.jndi.PropertiesFileInitialContextFactory > > connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid > > /blah?brokerlist='tcp://localhost:5672' > > destination.queueExchange = amq.direct > > > > The property above is defining the address of an Exchange, as opposed > to the specific Queue which you suggest above is what you actually > want to send to (and what you are consuming from? It may be helpful to > post the matching consumer code) > > The format for the address strings is described fruther at: > > http://qpid.apache.org/books/trunk/Programming-In-Apache-Qpid/html/section-addresses.html > but a basic address string which defines a queue named blahqueue and > tells the client never to create the queue (at the point of > establishing a producer or consumer) might be: > > destination.queueAddress = blahqueue; {create: never, node: { type: queue > } } > > > Here's a simple method to produce: > > private void startProducing() { > > Context context = null; > > Connection connection = null; > > try { > > Properties properties = new Properties(); > > > > properties.load(this.getClass().getResourceAsStream("blah.properties")); > > context = new InitialContext(properties); > > > > ConnectionFactory connectionFactory = (ConnectionFactory) > > context.lookup("qpidConnectionfactory"); > > connection = connectionFactory.createConnection(); > > connection.start(); > > > > Session session = connection.createSession(false, > > Session.AUTO_ACKNOWLEDGE); > > Destination destination = (AMQAnyDestination) > > context.lookup("queueExchange"); > > > > MessageProducer messageProducer = > > session.createProducer(destination); > > int count = 0; > > while (true) { > > DataInputStream dis = new DataInputStream(new > > FileInputStream("test.xml")); > > try { > > File testFile = new File("test.xml"); > > long len = new File("test.xml").length(); > > System.out.println("test.xml length = " + len); > > byte[] bytes = new byte[(int) len]; > > dis.readFully(bytes); > > for (int i = 0; i <= 100; i++) { > > count++; > > System.out.println(" [x] Sending test.xml. > Time > > = " + System.currentTimeMillis()); > > BytesMessage message = > > session.createBytesMessage(); > > message.writeBytes(bytes); > > messageProducer.send(message); > > System.out.println(" [x] Sent test.xml + " + > > count + ". Time = " + System.currentTimeMillis()); > > } > > } finally { > > dis.close(); > > } > > Thread.sleep(4000); > > } > > } catch (Exception e) { > > System.out.println("Exception caught: " + e.getMessage() + > > e.getStackTrace().toString()); > > try { > > if (context != null) { > > context.close(); > > } > > if (connection != null) { > > connection.close(); > > } > > } catch (Exception ex) { > > System.out.println("Exception caught closing > > connection: " + e.getMessage()); > > } > > > > } > > } > > > > What am I missing? It still waits for the consumer and no messages get > > queued. > > Regards, > Robbie > > --------------------------------------------------------------------- > To unsubscribe, e-mail: [email protected] > For additional commands, e-mail: [email protected] > >
