On 1/30/08, gustav.mauer <[EMAIL PROTECTED]> wrote:
>
> I assume you only want to ACK a message once the consumer/worker thread has
> processed it? I have also been wondering what the best pattern is for that.
> I was also wondering if the correct way to proceed is to create a listener
> for each worker thread? And if a listener thread is busy, with the request
> be delivered to one of the idle listener threads, so that multiple messages
> can be worked on in parallel in the server.

Yes, thats exactly what I try to do. I'll try with listeners.


>
>
> Sebastjan Trepca wrote:
> >
> > Yes, sorry for not RTFM, it's not very logical though, to ACK all
> > messages when you ack one.
> >
> > So what are my options then? Stomp protocol doesn't seem to support
> > sessions:
> >
> > "The session-id header is a unique identifier for this session (though
> > it isn't actually used yet).
> > "
> > Should I create separate transaction for each message and group them with
> > that?
> >
> > I currently have only one listener that fills a local queue that gets
> > processed by consumer threads. Should I create a listener/consumer for
> > each thread?
> >
> > Thanks, Sebastjan
> >
> > On 1/30/08, gustav.mauer <[EMAIL PROTECTED]> wrote:
> >>
> >> I am under the impression this is correct behaviour. See for example:
> >> http://java.sun.com/products/jms/tutorial/  page 63
> >>
> >>
> >> Sebastjan Trepca wrote:
> >> >
> >> > Hi,
> >> >
> >> > I'm using ActiveMQ(both versions are affected) with STOMP protocol and
> >> > noticed a problem with it. At least I hope it's a problem.
> >> >
> >> > When you ACK a message through stomp, all messages until the right one
> >> > gets ACKed too.
> >> >
> >> > If we check the code:
> >> >
> >> >     protected void onStompAck(StompFrame command) throws
> >> ProtocolException
> >> > {
> >> >         checkConnected();
> >> >
> >> >         // TODO: acking with just a message id is very bogus
> >> >         // since the same message id could have been sent to 2
> >> different
> >> >         // subscriptions
> >> >         // on the same stomp connection. For example, when 2 subs are
> >> > created on
> >> >         // the same topic.
> >> >
> >> >         Map<String, String> headers = command.getHeaders();
> >> >         String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
> >> >         if (messageId == null) {
> >> >             throw new ProtocolException("ACK received without a
> >> > message-id to acknowledge!");
> >> >         }
> >> >
> >> >         TransactionId activemqTx = null;
> >> >         String stompTx = headers.get(Stomp.Headers.TRANSACTION);
> >> >         if (stompTx != null) {
> >> >             activemqTx = transactions.get(stompTx);
> >> >             if (activemqTx == null) {
> >> >                 throw new ProtocolException("Invalid transaction id: "
> >> > + stompTx);
> >> >             }
> >> >         }
> >> >
> >> >         boolean acked = false;
> >> >         for (Iterator<StompSubscription> iter =
> >> > subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
> >> >             StompSubscription sub = iter.next();
> >> >             MessageAck ack = sub.onStompMessageAck(messageId);
> >> >             if (ack != null) {
> >> >                 ack.setTransactionId(activemqTx);
> >> >                 sendToActiveMQ(ack, createResponseHandler(command));
> >> >                 acked = true;
> >> >                 break;
> >> >             }
> >> >         }
> >> >
> >> >         if (!acked) {
> >> >             throw new ProtocolException("Unexpected ACK received for
> >> > message-id [" + messageId + "]");
> >> >         }
> >> >
> >> >     }
> >> >
> >> >
> >> > synchronized MessageAck onStompMessageAck(String messageId) {
> >> >
> >> >         if (!dispatchedMessage.containsKey(messageId)) {
> >> >             return null;
> >> >         }
> >> >
> >> >         MessageAck ack = new MessageAck();
> >> >         ack.setDestination(consumerInfo.getDestination());
> >> >         ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
> >> >         ack.setConsumerId(consumerInfo.getConsumerId());
> >> >
> >> >         int count = 0;
> >> >         for (Iterator iter = dispatchedMessage.entrySet().iterator();
> >> > iter.hasNext();) {
> >> >
> >> >             Map.Entry entry = (Entry)iter.next();
> >> >             String id = (String)entry.getKey();
> >> >             MessageId msgid = (MessageId)entry.getValue();
> >> >
> >> >             if (ack.getFirstMessageId() == null) {
> >> >                 ack.setFirstMessageId(msgid);
> >> >             }
> >> >
> >> >             iter.remove();
> >> >             count++;
> >> >
> >> >             if (id.equals(messageId)) {
> >> >                 ack.setLastMessageId(msgid);
> >> >                 break;
> >> >             }
> >> >
> >> >         }
> >> >
> >> >         ack.setMessageCount(count);
> >> >         return ack;
> >> >     }
> >> >
> >> > It loops through all messages and ACKs them until it finds the correct
> >> > message id. So when you send few messages and the last one is the
> >> > processed first all previous ones get ACKed?
> >> >
> >> > Is this a feature of STOMP protocol or is it a bug?
> >> >
> >> > Thanks, Sebastjan
> >> >
> >> >
> >>
> >> --
> >> View this message in context:
> >> http://www.nabble.com/Stomp-protocol-problems-tp15144123s2354p15177216.html
> >> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
> >>
> >>
> >
> >
>
> --
> View this message in context: 
> http://www.nabble.com/Stomp-protocol-problems-tp15144123s2354p15182191.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>


-- 
Sebastjan

Reply via email to