On 1/30/08, gustav.mauer <[EMAIL PROTECTED]> wrote: > > I assume you only want to ACK a message once the consumer/worker thread has > processed it? I have also been wondering what the best pattern is for that. > I was also wondering if the correct way to proceed is to create a listener > for each worker thread? And if a listener thread is busy, with the request > be delivered to one of the idle listener threads, so that multiple messages > can be worked on in parallel in the server.
Yes, thats exactly what I try to do. I'll try with listeners. > > > Sebastjan Trepca wrote: > > > > Yes, sorry for not RTFM, it's not very logical though, to ACK all > > messages when you ack one. > > > > So what are my options then? Stomp protocol doesn't seem to support > > sessions: > > > > "The session-id header is a unique identifier for this session (though > > it isn't actually used yet). > > " > > Should I create separate transaction for each message and group them with > > that? > > > > I currently have only one listener that fills a local queue that gets > > processed by consumer threads. Should I create a listener/consumer for > > each thread? > > > > Thanks, Sebastjan > > > > On 1/30/08, gustav.mauer <[EMAIL PROTECTED]> wrote: > >> > >> I am under the impression this is correct behaviour. See for example: > >> http://java.sun.com/products/jms/tutorial/ page 63 > >> > >> > >> Sebastjan Trepca wrote: > >> > > >> > Hi, > >> > > >> > I'm using ActiveMQ(both versions are affected) with STOMP protocol and > >> > noticed a problem with it. At least I hope it's a problem. > >> > > >> > When you ACK a message through stomp, all messages until the right one > >> > gets ACKed too. > >> > > >> > If we check the code: > >> > > >> > protected void onStompAck(StompFrame command) throws > >> ProtocolException > >> > { > >> > checkConnected(); > >> > > >> > // TODO: acking with just a message id is very bogus > >> > // since the same message id could have been sent to 2 > >> different > >> > // subscriptions > >> > // on the same stomp connection. For example, when 2 subs are > >> > created on > >> > // the same topic. > >> > > >> > Map<String, String> headers = command.getHeaders(); > >> > String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID); > >> > if (messageId == null) { > >> > throw new ProtocolException("ACK received without a > >> > message-id to acknowledge!"); > >> > } > >> > > >> > TransactionId activemqTx = null; > >> > String stompTx = headers.get(Stomp.Headers.TRANSACTION); > >> > if (stompTx != null) { > >> > activemqTx = transactions.get(stompTx); > >> > if (activemqTx == null) { > >> > throw new ProtocolException("Invalid transaction id: " > >> > + stompTx); > >> > } > >> > } > >> > > >> > boolean acked = false; > >> > for (Iterator<StompSubscription> iter = > >> > subscriptionsByConsumerId.values().iterator(); iter.hasNext();) { > >> > StompSubscription sub = iter.next(); > >> > MessageAck ack = sub.onStompMessageAck(messageId); > >> > if (ack != null) { > >> > ack.setTransactionId(activemqTx); > >> > sendToActiveMQ(ack, createResponseHandler(command)); > >> > acked = true; > >> > break; > >> > } > >> > } > >> > > >> > if (!acked) { > >> > throw new ProtocolException("Unexpected ACK received for > >> > message-id [" + messageId + "]"); > >> > } > >> > > >> > } > >> > > >> > > >> > synchronized MessageAck onStompMessageAck(String messageId) { > >> > > >> > if (!dispatchedMessage.containsKey(messageId)) { > >> > return null; > >> > } > >> > > >> > MessageAck ack = new MessageAck(); > >> > ack.setDestination(consumerInfo.getDestination()); > >> > ack.setAckType(MessageAck.STANDARD_ACK_TYPE); > >> > ack.setConsumerId(consumerInfo.getConsumerId()); > >> > > >> > int count = 0; > >> > for (Iterator iter = dispatchedMessage.entrySet().iterator(); > >> > iter.hasNext();) { > >> > > >> > Map.Entry entry = (Entry)iter.next(); > >> > String id = (String)entry.getKey(); > >> > MessageId msgid = (MessageId)entry.getValue(); > >> > > >> > if (ack.getFirstMessageId() == null) { > >> > ack.setFirstMessageId(msgid); > >> > } > >> > > >> > iter.remove(); > >> > count++; > >> > > >> > if (id.equals(messageId)) { > >> > ack.setLastMessageId(msgid); > >> > break; > >> > } > >> > > >> > } > >> > > >> > ack.setMessageCount(count); > >> > return ack; > >> > } > >> > > >> > It loops through all messages and ACKs them until it finds the correct > >> > message id. So when you send few messages and the last one is the > >> > processed first all previous ones get ACKed? > >> > > >> > Is this a feature of STOMP protocol or is it a bug? > >> > > >> > Thanks, Sebastjan > >> > > >> > > >> > >> -- > >> View this message in context: > >> http://www.nabble.com/Stomp-protocol-problems-tp15144123s2354p15177216.html > >> Sent from the ActiveMQ - User mailing list archive at Nabble.com. > >> > >> > > > > > > -- > View this message in context: > http://www.nabble.com/Stomp-protocol-problems-tp15144123s2354p15182191.html > Sent from the ActiveMQ - User mailing list archive at Nabble.com. > > -- Sebastjan