Author: ritchiem
Date: Wed Oct 18 00:26:13 2006
New Revision: 465166

URL: http://svn.apache.org/viewvc?view=rev&rev=465166
Log:
QPID-36 Added high/low water mark to DUPS_OK_ACKNOWLEDGE.

Modified:
    
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java
    incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/jms/Session.java
    
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/headers/MessageFactory.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQSession.java?view=diff&rev=465166&r1=465165&r2=465166
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQSession.java
 Wed Oct 18 00:26:13 2006
@@ -35,12 +35,12 @@
 import javax.jms.*;
 import javax.jms.IllegalStateException;
 import java.io.Serializable;
+import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.text.MessageFormat;
 
 public class AMQSession extends Closeable implements Session, QueueSession, 
TopicSession
 {
@@ -720,18 +720,18 @@
 
     public MessageConsumer createConsumer(Destination destination) throws 
JMSException
     {
-        return createConsumer(destination, _defaultPrefetchHighMark, false, 
false, null);
+        return createConsumer(destination, _defaultPrefetchHighMark, 
_defaultPrefetchLowMark, false, false, null);
     }
 
     public MessageConsumer createConsumer(Destination destination, String 
messageSelector) throws JMSException
     {
-        return createConsumer(destination, _defaultPrefetchHighMark, false, 
false, messageSelector);
+        return createConsumer(destination, _defaultPrefetchHighMark, 
_defaultPrefetchLowMark, false, false, messageSelector);
     }
 
     public MessageConsumer createConsumer(Destination destination, String 
messageSelector, boolean noLocal)
             throws JMSException
     {
-        return createConsumer(destination, _defaultPrefetchHighMark, noLocal, 
false, messageSelector);
+        return createConsumer(destination, _defaultPrefetchHighMark, 
_defaultPrefetchLowMark, noLocal, false, messageSelector);
     }
 
     public MessageConsumer createConsumer(Destination destination,
@@ -740,7 +740,18 @@
                                           boolean exclusive,
                                           String selector) throws JMSException
     {
-        return createConsumer(destination, prefetch, noLocal, exclusive, 
selector, null);
+        return createConsumer(destination, prefetch, prefetch, noLocal, 
exclusive, selector, null);
+    }
+
+
+    public MessageConsumer createConsumer(Destination destination,
+                                          int prefetchHigh,
+                                          int prefetchLow,
+                                          boolean noLocal,
+                                          boolean exclusive,
+                                          String selector) throws JMSException
+    {
+        return createConsumer(destination, prefetchHigh, prefetchLow, noLocal, 
exclusive, selector, null);
     }
 
     public MessageConsumer createConsumer(Destination destination,
@@ -750,12 +761,25 @@
                                           String selector,
                                           FieldTable rawSelector) throws 
JMSException
     {
-        return createConsumerImpl(destination, prefetch, noLocal, exclusive,
+        return createConsumerImpl(destination, prefetch, prefetch, noLocal, 
exclusive,
+                                  selector, rawSelector);
+    }
+
+    public MessageConsumer createConsumer(Destination destination,
+                                          int prefetchHigh,
+                                          int prefetchLow,
+                                          boolean noLocal,
+                                          boolean exclusive,
+                                          String selector,
+                                          FieldTable rawSelector) throws 
JMSException
+    {
+        return createConsumerImpl(destination, prefetchHigh, prefetchLow, 
noLocal, exclusive,
                                   selector, rawSelector);
     }
 
     protected MessageConsumer createConsumerImpl(final Destination destination,
-                                                 final int prefetch,
+                                                 final int prefetchHigh,
+                                                 final int prefetchLow,
                                                  final boolean noLocal,
                                                  final boolean exclusive,
                                                  final String selector,
@@ -780,7 +804,7 @@
                 }
                 BasicMessageConsumer consumer = new 
BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal,
                                                                          
_messageFactoryRegistry, AMQSession.this,
-                                                                         
protocolHandler, ft, prefetch, exclusive,
+                                                                         
protocolHandler, ft, prefetchHigh, prefetchLow, exclusive,
                                                                          
_acknowledgeMode);
 
                 try
@@ -862,9 +886,10 @@
      * @param queueName
      * @return the consumer tag generated by the broker
      */
-    private String consumeFromQueue(String queueName, AMQProtocolHandler 
protocolHandler, int prefetch,
+    private String consumeFromQueue(String queueName, AMQProtocolHandler 
protocolHandler, int prefetchHigh, int prefetchLow,
                                     boolean noLocal, boolean exclusive, int 
acknowledgeMode) throws AMQException
     {
+        //fixme prefetch values are not used here. Do we need to have them as 
parametsrs?
         //need to generate a consumer tag on the client so we can exploit the 
nowait flag
         String tag = Integer.toString(_nextTag++);
 
@@ -1118,8 +1143,8 @@
 
         bindQueue(amqd, queueName, protocolHandler, 
consumer.getRawSelectorFieldTable());
 
-        String consumerTag = consumeFromQueue(queueName, protocolHandler, 
consumer.getPrefetch(), consumer.isNoLocal(),
-                                              consumer.isExclusive(), 
consumer.getAcknowledgeMode());
+        String consumerTag = consumeFromQueue(queueName, protocolHandler, 
consumer.getPrefetchHigh(), consumer.getPrefetchLow(),
+                                              consumer.isNoLocal(), 
consumer.isExclusive(), consumer.getAcknowledgeMode());
 
         consumer.setConsumerTag(consumerTag);
         _consumers.put(consumerTag, consumer);

Modified: 
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=465166&r1=465165&r2=465166
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java
 Wed Oct 18 00:26:13 2006
@@ -91,9 +91,14 @@
     private FieldTable _rawSelectorFieldTable;
 
     /**
-     * We store the prefetch field in order to be able to reuse it when 
resubscribing in the event of failover
+     * We store the high water prefetch field in order to be able to reuse it 
when resubscribing in the event of failover
      */
-    private int _prefetch;
+    private int _prefetchHigh;
+
+    /**
+     * We store the low water prefetch field in order to be able to reuse it 
when resubscribing in the event of failover
+     */
+    private int _prefetchLow;
 
     /**
      * We store the exclusive field in order to be able to reuse it when 
resubscribing in the event of failover
@@ -118,10 +123,16 @@
      */
     private long _lastDeliveryTag;
 
+    /**
+     * Switch to enable sending of acknowledgements when using 
DUPS_OK_ACKNOWLEDGE mode.
+     * Enabled when _outstannding number of msgs >= _prefetchHigh and disabled 
at < _prefetchLow
+     */
+    private boolean _dups_ok_acknowledge_send;
+
     BasicMessageConsumer(int channelId, AMQConnection connection, 
AMQDestination destination, String messageSelector,
                          boolean noLocal, MessageFactoryRegistry 
messageFactory, AMQSession session,
-                         AMQProtocolHandler protocolHandler, FieldTable 
rawSelectorFieldTable, int prefetch,
-                         boolean exclusive, int acknowledgeMode)
+                         AMQProtocolHandler protocolHandler, FieldTable 
rawSelectorFieldTable,
+                         int prefetchHigh, int prefetchLow, boolean exclusive, 
int acknowledgeMode)
     {
         _channelId = channelId;
         _connection = connection;
@@ -132,7 +143,8 @@
         _session = session;
         _protocolHandler = protocolHandler;
         _rawSelectorFieldTable = rawSelectorFieldTable;
-        _prefetch = prefetch;
+        _prefetchHigh = prefetchHigh;
+        _prefetchLow = prefetchLow;
         _exclusive = exclusive;
         _acknowledgeMode = acknowledgeMode;
     }
@@ -232,7 +244,17 @@
 
     public int getPrefetch()
     {
-        return _prefetch;
+        return _prefetchHigh;
+    }
+
+    public int getPrefetchHigh()
+    {
+        return _prefetchHigh;
+    }
+
+    public int getPrefetchLow()
+    {
+        return _prefetchLow;
     }
 
     public boolean isNoLocal()
@@ -309,10 +331,11 @@
     /**
      * We can get back either a Message or an exception from the queue. This 
method examines the argument and deals
      * with it by throwing it (if an exception) or returning it (in any other 
case).
+     *
      * @param o
      * @return a message only if o is a Message
      * @throws JMSException if the argument is a throwable. If it is a 
JMSException it is rethrown as is, but if not
-     * a JMSException is created with the linked exception set appropriately
+     *                      a JMSException is created with the linked 
exception set appropriately
      */
     private AbstractJMSMessage returnMessageOrThrow(Object o)
             throws JMSException
@@ -335,7 +358,7 @@
 
     public void close() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             if (!_closed.getAndSet(true))
             {
@@ -370,8 +393,9 @@
     /**
      * Called from the AMQSession when a message has arrived for this 
consumer. This methods handles both the case
      * of a message listener or a synchronous receive() caller.
+     *
      * @param messageFrame the raw unprocessed mesage
-     * @param channelId channel on which this message was sent
+     * @param channelId    channel on which this message was sent
      */
     void notifyMessage(UnprocessedMessage messageFrame, int channelId)
     {
@@ -435,7 +459,16 @@
         switch (_acknowledgeMode)
         {
             case Session.DUPS_OK_ACKNOWLEDGE:
-                if (++_outstanding >= _prefetch)
+                if (++_outstanding >= _prefetchHigh)
+                {
+                    _dups_ok_acknowledge_send = true;
+                }
+                if (_outstanding <= _prefetchLow)
+                {
+                    _dups_ok_acknowledge_send = false;
+                }
+
+                if (_dups_ok_acknowledge_send)
                 {
                     _session.acknowledgeMessage(msg.getDeliveryTag(), true);
                 }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/jms/Session.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/jms/Session.java?view=diff&rev=465166&r1=465165&r2=465166
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/jms/Session.java 
(original)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/jms/Session.java 
Wed Oct 18 00:26:13 2006
@@ -42,6 +42,13 @@
                                    boolean exclusive,
                                    String selector) throws JMSException;
 
+       MessageConsumer createConsumer(Destination destination,
+                                   int prefetchHigh,
+                                   int prefetchLow,
+                                   boolean noLocal,
+                                   boolean exclusive,
+                                   String selector) throws JMSException;
+
     /**
      * @return the prefetch value used by default for consumers created on 
this session.
      */

Modified: 
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/headers/MessageFactory.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/headers/MessageFactory.java?view=diff&rev=465166&r1=465165&r2=465166
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/headers/MessageFactory.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/headers/MessageFactory.java
 Wed Oct 18 00:26:13 2006
@@ -17,10 +17,15 @@
  */
 package org.apache.qpid.headers;
 
-import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.FieldTable;
 
-import javax.jms.*;
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
 
 /**
  */
@@ -46,7 +51,7 @@
         }
         _session = session;
         _payload = new byte[payloadSize];
-        for(int i = 0; i < _payload.length; i++)
+        for (int i = 0; i < _payload.length; i++)
         {
             _payload[i] = (byte) DATA[i % DATA.length];
         }
@@ -156,7 +161,7 @@
 
     private static Message setHeaders(Message m, String[] headers) throws 
JMSException
     {
-        for(int i = 0; i < headers.length; i++)
+        for (int i = 0; i < headers.length; i++)
         {
             // the value in GRM is 5 bytes
             m.setStringProperty(headers[i], "value");


Reply via email to