Author: arnaudsimon
Date: Mon Aug  6 12:20:13 2007
New Revision: 563226

URL: http://svn.apache.org/viewvc?view=rev&rev=563226
Log:
Implemented queue browsing 

Added:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidBrowserListener.java
   (with props)
Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java?view=diff&rev=563226&r1=563225&r2=563226
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
 Mon Aug  6 12:20:13 2007
@@ -348,7 +348,7 @@
             // This indicate to the delivery thread to deliver the message to 
this consumer
             // as it can happens that a message is delivered after a receive 
operation as returned.
             _isReceiving = true;
-            boolean received = false;
+            int received = 0;
             if (!_isStopped)
             {
                 // if this consumer is stopped then this will be call when 
starting
@@ -356,7 +356,7 @@
                         .messageFlow(getMessageActorID(), 
org.apache.qpidity.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
                 received = 
getSession().getQpidSession().messageFlush(getMessageActorID());
             }
-            if (!received && timeout < 0)
+            if ( received == 0 && timeout < 0)
             {
                 // this is a nowait and we havent received a message then we 
must immediatly return
                 result = null;
@@ -489,8 +489,8 @@
                             getSession().getQpidSession()
                                     .messageFlow(getMessageActorID(),
                                                  
org.apache.qpidity.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
-                            boolean received = 
getSession().getQpidSession().messageFlush(getMessageActorID());
-                            if (!received && _isNoWaitIsReceiving)
+                            int received = 
getSession().getQpidSession().messageFlush(getMessageActorID());
+                            if ( received == 0  && _isNoWaitIsReceiving)
                             {
                                 // Right a message nowait is waiting for a 
message
                                 // but no one can be delivered it then need to 
return

Added: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidBrowserListener.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidBrowserListener.java?view=auto&rev=563226
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidBrowserListener.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidBrowserListener.java
 Mon Aug  6 12:20:13 2007
@@ -0,0 +1,70 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.jms;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.qpidity.MessageListener;
+import org.apache.qpidity.api.Message;
+
+/**
+ * This listener idspatches messaes to its browser.
+ */
+public class QpidBrowserListener implements MessageListener
+{
+    /**
+     * Used for debugging.
+     */
+    private static final Logger _logger = 
LoggerFactory.getLogger(SessionImpl.class);
+
+    /**
+     * This message listener's browser.
+     */
+    QueueBrowserImpl _browser = null;
+
+      //---- constructor
+    /**
+     * Create a message listener wrapper for a given browser
+     *
+     * @param browser The browser of this listener
+     */
+    public QpidBrowserListener(QueueBrowserImpl browser)
+    {
+        _browser = browser;
+    }
+
+    //---- org.apache.qpidity.MessagePartListener API
+    /**
+     * Deliver a message to the listener.
+     *
+     * @param message The message delivered to the listner.
+     */
+    public void onMessage(Message message)
+    {
+        try
+        {
+            //convert this message into a JMS one
+            javax.jms.Message jmsMessage = null; // todo
+            _browser.receiveMessage(jmsMessage);
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+}

Propchange: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidBrowserListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java?view=diff&rev=563226&r1=563225&r2=563226
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
 Mon Aug  6 12:20:13 2007
@@ -17,10 +17,18 @@
  */
 package org.apache.qpidity.jms;
 
+import org.apache.qpidity.filter.JMSSelectorFilter;
+import org.apache.qpidity.filter.MessageFilter;
+import org.apache.qpidity.MessagePartListener;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.impl.MessagePartListenerAdapter;
+
 import javax.jms.QueueBrowser;
 import javax.jms.JMSException;
 import javax.jms.Queue;
+import javax.jms.Message;
 import java.util.Enumeration;
+import java.util.NoSuchElementException;
 
 /**
  * Implementation of the JMS QueueBrowser interface
@@ -32,6 +40,36 @@
      */
     private String _messageSelector = null;
 
+    /**
+     * The message selector filter associated with this browser
+     */
+    private MessageFilter _filter = null;
+
+    /**
+     * The batch of messages to browse.
+     */
+    private Message[] _messages;
+
+    /**
+     * The number of messages read from current batch.
+     */
+    private int _browsed = 0;
+
+    /**
+     * The number of messages received from current batch.
+     */
+    private int _received = 0;
+
+    /**
+     * Indicates whether the last message has been received.
+     */
+    private int _batchLength;
+
+    /**
+     * The batch max size
+     */
+    private final int _maxbatchlength = 10;
+
     //--- constructor
 
     /**
@@ -40,13 +78,26 @@
      * @param session         The session of this browser.
      * @param queue           The queue name for this browser
      * @param messageSelector only messages with properties matching the 
message selector expression are delivered.
-     * @throws JMSException In case of internal problem when creating this 
browser.
+     * @throws Exception In case of internal problem when creating this 
browser.
      */
-    protected QueueBrowserImpl(SessionImpl session, Queue queue, String 
messageSelector) throws JMSException
+    protected QueueBrowserImpl(SessionImpl session, Queue queue, String 
messageSelector) throws Exception
     {
         super(session, (DestinationImpl) queue);
-        _messageSelector = messageSelector;
-        //-- TODO: Create the QPid browser
+        // this is an array representing a batch of messages for this browser.
+        _messages = new Message[_maxbatchlength];
+        if (messageSelector != null)
+        {
+            _messageSelector = messageSelector;
+            _filter = new JMSSelectorFilter(messageSelector);
+        }
+        MessagePartListener messageAssembler = new 
MessagePartListenerAdapter(new QpidBrowserListener(this));
+        // this is a queue we expect that this queue exists
+        getSession().getQpidSession()
+                .messageSubscribe(queue.getQueueName(), getMessageActorID(),
+                                  
org.apache.qpidity.Session.CONFIRM_MODE_NOT_REQUIRED,
+                                  // We do not acquire those messages
+                                  
org.apache.qpidity.Session.ACQUIRE_MODE_NO_ACQUIRE, messageAssembler, null);
+
     }
 
     //--- javax.jms.QueueBrowser API
@@ -58,10 +109,11 @@
      */
     public Enumeration getEnumeration() throws JMSException
     {
-        // TODO
-        return null;
+        requestMessages();
+        return new MessageEnumeration();
     }
 
+
     /**
      * Get the queue associated with this queue browser.
      *
@@ -70,6 +122,7 @@
      */
     public Queue getQueue() throws JMSException
     {
+        checkNotClosed();
         return (Queue) _destination;
     }
 
@@ -81,6 +134,130 @@
      */
     public String getMessageSelector() throws JMSException
     {
+        checkNotClosed();
         return _messageSelector;
     }
+
+    //-- overwritten methods.  
+    /**
+     * Closes the browser and deregister it from its session.
+     *
+     * @throws JMSException if the MessaeActor cannot be closed due to some 
internal error.
+     */
+    public void close() throws JMSException
+    {
+        synchronized (_messages)
+        {
+            _received = 0;
+            _browsed = 0;
+            _batchLength = 0;
+            _messages.notify();
+        }
+        super.close();
+    }
+
+    //-- nonpublic methods
+    /**
+     * Request _maxbatchlength messages
+     *
+     * @throws JMSException If requesting more messages fails due to some 
internal error.
+     */
+    private void requestMessages() throws JMSException
+    {
+        _browsed = 0;
+        _received = 0;
+        // request messages
+        int received = 0;
+        try
+        {
+            getSession().getQpidSession()
+                    .messageFlow(getMessageActorID(), 
org.apache.qpidity.Session.MESSAGE_FLOW_UNIT_MESSAGE,
+                                 _maxbatchlength);
+            _batchLength = 
getSession().getQpidSession().messageFlush(getMessageActorID());
+        }
+        catch (QpidException e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+    }
+
+    /**
+     * This method is invoked by the listener when a message is dispatched to 
this browser.
+     *
+     * @param m A received message
+     */
+    protected void receiveMessage(Message m)
+    {
+        synchronized (_messages)
+        {
+            _messages[_received] = m;
+            _received++;
+            _messages.notify();
+        }
+    }
+
+    //-- inner class
+    /**
+     * This is an implementation of the Enumeration interface.
+     */
+    private class MessageEnumeration implements Enumeration
+    {
+        /*
+        * Whether this enumeration has any more elements.
+        *
+        * @return True if there any more elements.
+        */
+        public boolean hasMoreElements()
+        {
+            boolean result = false;
+            // Try to work out whether there are any more messages available.
+            try
+            {
+                if (_browsed >= _maxbatchlength)
+                {
+                    requestMessages();
+                }
+                synchronized (_messages)
+                {
+                    while (_received == _browsed && _batchLength > _browsed)
+                    {
+                        // we expect more messages
+                        _messages.wait();
+                    }
+                    if (_browsed < _received && _batchLength != _browsed)
+                    {
+                        result = true;
+                    }
+                }
+            }
+            catch (Exception e)
+            {
+                // If no batch could be returned, the result should be false, 
therefore do nothing
+            }
+            return result;
+        }
+
+        /**
+         * Get the next message element
+         *
+         * @return The next element.
+         */
+        public Object nextElement()
+        {
+            if (hasMoreElements())
+            {
+                synchronized (_messages)
+                {
+                    Message message = _messages[_browsed];
+                    _browsed = _browsed + 1;
+                    return message;
+                }
+            }
+            else
+            {
+                throw new NoSuchElementException();
+            }
+        }
+    }
+
 }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java?view=diff&rev=563226&r1=563225&r2=563226
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
 Mon Aug  6 12:20:13 2007
@@ -129,8 +129,7 @@
      * @param acknowledgeMode The session's acknowledgement mode. This value 
is ignored and set to
      *                        [EMAIL PROTECTED] Session#SESSION_TRANSACTED} if 
the <code>transacted</code> parameter is true.
      * @param isXA            Indicates whether this session is an XA session.
-     * @throws JMSSecurityException If the user could not be authenticated.
-     * @throws QpidException        In case of internal error.
+     * @throws QpidException In case of internal error.
      */
     protected SessionImpl(ConnectionImpl connection, boolean transacted, int 
acknowledgeMode, boolean isXA)
             throws QpidException
@@ -717,7 +716,15 @@
     {
         checkNotClosed();
         checkDestination(queue);
-        QueueBrowserImpl browser = new QueueBrowserImpl(this, queue, 
messageSelector);
+        QueueBrowserImpl browser;
+        try
+        {
+            browser = new QueueBrowserImpl(this, queue, messageSelector);
+        }
+        catch (Exception e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
         // register this actor with the session
         _messageActors.put(browser.getMessageActorID(), browser);
         return browser;


Reply via email to