Author: rajith
Date: Thu Aug 23 21:06:18 2007
New Revision: 569238

URL: http://svn.apache.org/viewvc?rev=569238&view=rev
Log:
Fixed the following issues
1) TopicImpl doesn't populate the routing key properly.
The Destination Impl needs to have a routing key field (I added the
field).
For Topic The queue name is generated.
For Queue the routingkey is same as queue name.

2) QpidMessage - Calling flip on messageData resets the limit to zero in
beforeMessageDispatch().  I commented out the flip()

3) QpidMessage - setMessageData
Instead of _messageData = messageBody, I modified it to do
_messageData = messageBody.duplicate();

4) MessageActorId is not set properly - so I modified the code to set
this. This id is used for the destination

5) When creating BytesMessageImpl, in the constructor, it doesn't  read from 
the underlying
message impl. There for the _readIn is null and results in 
MessageNotReadableException.
I added a temp solution to read and populate _readIn.
However need to revisit it later


Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java
 Thu Aug 23 21:06:18 2007
@@ -108,7 +108,6 @@
         ClientSession ssn = new ClientSession();
         ssn.attach(ch);
         ssn.sessionOpen(expiryInSeconds);
-        
         return ssn;
     }
 

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
 Thu Aug 23 21:06:18 2007
@@ -29,7 +29,7 @@
         {
             for (long l = range.getLower(); l <= range.getUpper(); l++)
             {
-                System.out.println("Acknowleding message for : " + 
super.getCommand((int) l));
+                System.out.println("Acknowleding transfer id : " + l);
                 super.processed(l);
             }
         }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
 Thu Aug 23 21:06:18 2007
@@ -67,10 +67,12 @@
         }
         ((ClientSession)session).setRejectedMessages(struct.getTransfers());
         ((ClientSession)session).notifyException(new QpidException("Message 
Rejected",ErrorCode.MESSAGE_REJECTED,null));
+        session.processed(struct);
     }
     
     @Override public void messageAcquired(Session session, MessageAcquired 
struct) 
     {
         ((ClientSession)session).setAccquiredMessages(struct.getTransfers());
+        session.processed(struct);
     }
 }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
 Thu Aug 23 21:06:18 2007
@@ -30,7 +30,8 @@
 
     public ByteBufferMessage()
     {
-       
+        _currentDeliveryProps = new DeliveryProperties();
+        _currentMessageProps = new MessageProperties();
     }
 
     public ByteBufferMessage(long transferId)
@@ -70,6 +71,7 @@
 
     public MessageProperties getMessageProperties()
     {
+        System.out.println("MessageProperties is null ? " + 
_currentMessageProps == null? "true":"false");
         return _currentMessageProps;
     }
     

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java
 Thu Aug 23 21:06:18 2007
@@ -68,6 +68,8 @@
      * Indicates whether this destination is durable
      */
     protected boolean _isDurable;
+    
+    protected String _routingKey;
 
     /**
      * The biding URL used to create this destiantion
@@ -79,6 +81,7 @@
     protected DestinationImpl(String name) throws QpidException
     {
        _queueName = name;
+       _routingKey = name;
     }
 
     /**
@@ -96,6 +99,7 @@
         _isAutoDelete = 
Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE));
         _isDurable = 
Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE));
         _queueName = binding.getQueueName();
+        _routingKey = binding.getQueueName();
         _url = binding;
     }
 
@@ -171,6 +175,11 @@
         return _isAutoDelete;
     }
 
+    public String getRoutingKey()
+    {
+        return _routingKey;
+    }
+    
     /**
      * Indicates whether this destination is Durable.
      *

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java
 Thu Aug 23 21:06:18 2007
@@ -63,15 +63,16 @@
 
     //TODO define the parameters
 
-    protected MessageActor()
+    protected MessageActor(String messageActorID)
     {
-
+        _messageActorID = messageActorID;
     }
 
-    protected MessageActor(SessionImpl session, DestinationImpl destination)
+    protected MessageActor(SessionImpl session, DestinationImpl 
destination,String messageActorID)
     {
         _session = session;
         _destination = destination;
+        _messageActorID = messageActorID;
     }
 
     //--- public methods (part of the jms public API)

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
 Thu Aug 23 21:06:18 2007
@@ -112,9 +112,9 @@
      * @throws Exception If the MessageProducerImpl cannot be created due to 
some internal error.
      */
     protected MessageConsumerImpl(SessionImpl session, DestinationImpl 
destination, String messageSelector,
-                                  boolean noLocal, String subscriptionName) 
throws Exception
+                                  boolean noLocal, String 
subscriptionName,String consumerTag) throws Exception
     {
-        super(session, destination);
+        super(session, destination,consumerTag);
         if (messageSelector != null)
         {
             _messageSelector = messageSelector;
@@ -167,7 +167,7 @@
             }
             // bind this queue with the topic exchange
             getSession().getQpidSession()
-                    .queueBind(queueName, 
ExchangeDefaults.TOPIC_EXCHANGE_NAME, destination.getQpidQueueName(), null);
+                    .queueBind(queueName, 
ExchangeDefaults.TOPIC_EXCHANGE_NAME, destination.getRoutingKey(), null);
             // subscribe to this topic 
             getSession().getQpidSession()
                     .messageSubscribe(queueName, getMessageActorID(),
@@ -183,6 +183,13 @@
         // set the flow mode
         getSession().getQpidSession()
                 .messageFlowMode(getMessageActorID(), 
org.apache.qpidity.client.Session.MESSAGE_FLOW_MODE_CREDIT);
+        
+        // this will prevent the broker from sending more than one message
+        // When a messageListener is set the flow will be adjusted.
+        // until then we assume it's for synchronous message consumption
+        getSession().getQpidSession()
+        .messageFlow(getMessageActorID(), 
org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,1);
+        
         getSession().getQpidSession().sync();
         // check for an exception
         if (getSession().getCurrentException() != null)
@@ -347,12 +354,21 @@
     private Message internalReceive(long timeout) throws Exception
     {
         checkNotClosed();
+        Message result = null;
+        
         if (_messageListener != null)
         {
             throw new javax.jms.IllegalStateException("A listener has already 
been set.");
         }
-
-        Message result = null;
+        
+        if (_incomingMessage != null)
+        {
+            System.out.println("We already had a message in the queue");
+            result = (Message) _incomingMessage;
+            _incomingMessage = null;
+            return result;
+        }
+       
         synchronized (_incomingMessageLock)
         {
             // This indicate to the delivery thread to deliver the message to 
this consumer
@@ -366,11 +382,14 @@
                                      1);
                 
getSession().getQpidSession().messageFlush(getMessageActorID());
                 _messageReceived.set(false);
-                
+                System.out.println("no message in the queue, issuing a flow(1) 
and waiting for message");
+                                
                 //When sync() returns we know whether we have received a 
message or not.
                 getSession().getQpidSession().sync();
+                
+                System.out.println("we got returned from sync()"); 
                 //received = getSession().getQpidSession().messagesReceived();
-            }
+            }            
             if (_messageReceived.get() && timeout < 0)
             {
                 // this is a nowait and we havent received a message then we 
must immediatly return
@@ -387,6 +406,7 @@
                 {
                     try
                     {
+                        System.out.println("waiting for message");             
           
                         _incomingMessageLock.wait(timeout);
                     }
                     catch (InterruptedException e)
@@ -479,18 +499,24 @@
             // notify the waiting thread
             if (_messageListener == null)
             {
+                System.out.println("Received a message- onMessage in message 
consumer Impl");
+                
                 synchronized (_incomingMessageLock)
                 {
                     if (messageOk)
                     {
+                        System.out.println("Received a message- onMessage in 
message ok " + messageOk);
                         // we have received a proper message that we can 
deliver
                         if (_isReceiving)
                         {
+                            System.out.println("Received a message- onMessage 
in message _isReceiving");
+                            
                             _incomingMessage = message;
                             _incomingMessageLock.notify();
                         }
                         else
                         {
+                            System.out.println("Received a message- onMessage 
in message releasing");
                             // this message has been received after a received 
as returned
                             // we need to release it
                             releaseMessage(message);

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
 Thu Aug 23 21:06:18 2007
@@ -58,7 +58,7 @@
     //-- constructors
     public MessageProducerImpl(SessionImpl session, DestinationImpl 
destination)
     {
-        super(session, destination);
+        super(session, destination,"");
     }
 
     //--- Interface javax.jms.MessageProducer

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
 Thu Aug 23 21:06:18 2007
@@ -17,10 +17,13 @@
  */
 package org.apache.qpidity.jms;
 
-import org.apache.qpidity.jms.message.QpidMessage;
-import org.apache.qpidity.jms.message.MessageFactory;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
 import org.apache.qpidity.api.Message;
 import org.apache.qpidity.client.util.MessageListener;
+import org.apache.qpidity.jms.message.MessageFactory;
+import org.apache.qpidity.jms.message.QpidMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -29,8 +32,12 @@
  * This is for guarantying that asynch messages are sequentially processed 
within their session.
  * <p> when used synchonously, messages are dispatched to the receiver itself.
  */
-public class QpidMessageListener implements MessageListener
+public class QpidMessageListener implements MessageListener, Runnable
 {
+    
+    // temp solution
+    LinkedBlockingQueue<Message> _queue = new LinkedBlockingQueue<Message>();
+    
     /**
      * Used for debugging.
      */
@@ -50,37 +57,53 @@
     public QpidMessageListener(MessageConsumerImpl consumer)
     {
         _consumer = consumer;
+        Thread t = new Thread(this);
+        t.start();
     }
-
-    //---- org.apache.qpidity.MessagePartListener API
-    /**
-     * Deliver a message to the listener.
-     *
-     * @param message The message delivered to the listner.
-     */
-    public void onMessage(Message message)
+    
+    public void run()
     {
         try
         {
-            // to be used with flush
-            _consumer.notifyMessageReceived();
-            
-            //convert this message into a JMS one
-            QpidMessage jmsMessage = MessageFactory.getQpidMessage(message);
-            // if consumer is asynchronous then send this message to its 
session.
-            if( _consumer.getMessageListener() != null )
+            while(true)
             {
-                
_consumer.getSession().dispatchMessage(_consumer.getMessageActorID(), 
jmsMessage);
-            }
-            else
-            {
-                // deliver this message to the consumer itself
-                _consumer.onMessage(jmsMessage);
+                System.out.println("trying to take a message message");
+                Message message = _queue.take();
+                    
+                // to be used with flush
+                System.out.println("processing the message");
+                _consumer.notifyMessageReceived();
+                            
+                //convert this message into a JMS one
+                QpidMessage jmsMessage = 
MessageFactory.getQpidMessage(message);
+                // if consumer is asynchronous then send this message to its 
session.
+                if( _consumer.getMessageListener() != null )
+                {
+                    
_consumer.getSession().dispatchMessage(_consumer.getMessageActorID(), 
jmsMessage);
+                }
+                else
+                {
+                    // deliver this message to the consumer itself
+                    _consumer.onMessage(jmsMessage);
+                }
             }
         }
         catch (Exception e)
         {
             throw new RuntimeException(e.getMessage());
         }
+    }
+
+    //---- org.apache.qpidity.MessagePartListener API
+    /**
+     * Deliver a message to the listener.
+     *
+     * @param message The message delivered to the listner.
+     */
+    public void onMessage(Message message)
+    {
+        System.out.println("Received a message");
+        _queue.offer(message);
+        System.out.println("Added queue to the message");
     }
 }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
 Thu Aug 23 21:06:18 2007
@@ -80,9 +80,9 @@
      * @param messageSelector only messages with properties matching the 
message selector expression are delivered.
      * @throws Exception In case of internal problem when creating this 
browser.
      */
-    protected QueueBrowserImpl(SessionImpl session, Queue queue, String 
messageSelector) throws Exception
+    protected QueueBrowserImpl(SessionImpl session, Queue queue, String 
messageSelector,String consumerTag) throws Exception
     {
-        super(session, (DestinationImpl) queue);
+        super(session, (DestinationImpl) queue,consumerTag);
         // this is an array representing a batch of messages for this browser.
         _messages = new Message[_maxbatchlength];
         if (messageSelector != null)

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java
 Thu Aug 23 21:06:18 2007
@@ -35,9 +35,9 @@
      * @param messageSelector the message selector for this QueueReceiverImpl. 
 
      * @throws Exception If the QueueReceiverImpl cannot be created due to 
some internal error.
      */
-    protected QueueReceiverImpl(SessionImpl session, Queue queue, String 
messageSelector) throws Exception
+    protected QueueReceiverImpl(SessionImpl session, Queue queue, String 
messageSelector,String consumerTag) throws Exception
     {
-        super(session, (DestinationImpl) queue, messageSelector, false, null);
+        super(session, (DestinationImpl) queue, messageSelector, false, 
null,consumerTag);
     }
 
     //--- Interface  QueueReceiver

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java
 Thu Aug 23 21:06:18 2007
@@ -128,7 +128,7 @@
         QueueReceiver receiver;
         try
         {
-            receiver =  new QueueReceiverImpl(this, queue, messageSelector);
+            receiver =  new QueueReceiverImpl(this, queue, 
messageSelector,String.valueOf(_consumerTag.incrementAndGet()));
         }
         catch (Exception e)
         {

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
 Thu Aug 23 21:06:18 2007
@@ -25,12 +25,11 @@
 
 import javax.jms.*;
 import javax.jms.IllegalStateException;
-import javax.jms.MessageListener;
-import javax.jms.Session;
 import java.io.Serializable;
 import java.util.LinkedList;
 import java.util.HashMap;
 import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Implementation of the JMS Session interface
@@ -123,6 +122,12 @@
      * This session connection
      */
     private ConnectionImpl _connection;
+    
+    /**
+     * This will be used as the message actor id
+     * This in turn will be set as the destination
+     */
+    protected AtomicInteger _consumerTag = new AtomicInteger();
 
     //--- Constructor
     /**
@@ -594,7 +599,7 @@
         MessageConsumerImpl consumer;
         try
         {
-            consumer = new MessageConsumerImpl(this, (DestinationImpl) 
destination, messageSelector, noLocal, null);
+            consumer = new MessageConsumerImpl(this, (DestinationImpl) 
destination, messageSelector, noLocal, 
null,String.valueOf(_consumerTag.incrementAndGet()));
         }
         catch (Exception e)
         {
@@ -721,7 +726,7 @@
         try
         {
             subscriber = new TopicSubscriberImpl(this, topic, messageSelector, 
noLocal,
-                                                 _connection.getClientID() + 
":" + name);
+                                                 _connection.getClientID() + 
":" + name,String.valueOf(_consumerTag.incrementAndGet()));
         }
         catch (Exception e)
         {
@@ -765,7 +770,7 @@
         QueueBrowserImpl browser;
         try
         {
-            browser = new QueueBrowserImpl(this, queue, messageSelector);
+            browser = new QueueBrowserImpl(this, queue, 
messageSelector,String.valueOf(_consumerTag.incrementAndGet()));
         }
         catch (Exception e)
         {
@@ -1114,7 +1119,7 @@
      */
     protected void testQpidException() throws QpidException
     {
-        _qpidSession.sync();
+        //_qpidSession.sync();
         QpidException qe = getCurrentException();
         if (qe != null)
         {

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java
 Thu Aug 23 21:06:18 2007
@@ -42,6 +42,7 @@
     {
         super(name);
         _queueName = "Topic-" + UUID.randomUUID();
+        _routingKey = name;
         _destinationName = name;
         _exchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
         _exchangeType = ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
@@ -61,6 +62,7 @@
     {
         super(name);
         _queueName = "Topic-" + UUID.randomUUID();
+        _routingKey = name;
         _destinationName = name;
         _exchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
         _exchangeType = ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
@@ -116,7 +118,11 @@
         // test if this exchange exist on the broker
         session.getQpidSession().exchangeDeclare(_exchangeName, _exchangeType, 
null, null, Option.PASSIVE);
         // wait for the broker response
+        System.out.println("Checking for exchange");
+        
         session.getQpidSession().sync();
+        
+        System.out.println("Calling sync()");
         // todo get the exception
     }
 

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java
 Thu Aug 23 21:06:18 2007
@@ -144,7 +144,7 @@
         TopicSubscriber topicSubscriber;
         try
         {
-            topicSubscriber = new TopicSubscriberImpl(this, topic, 
messageSelector, noLocal, null);
+            topicSubscriber = new TopicSubscriberImpl(this, topic, 
messageSelector, noLocal, null,String.valueOf(_consumerTag.incrementAndGet()));
         }
         catch (Exception e)
         {

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java
 Thu Aug 23 21:06:18 2007
@@ -39,9 +39,9 @@
      * @throws Exception If the TopicSubscriberImpl cannot be created due to 
internal error.
      */
     protected TopicSubscriberImpl(SessionImpl session, Topic topic, String 
messageSelector, boolean noLocal,
-                                  String subscriptionName) throws Exception
+                                  String subscriptionName,String consumerTag) 
throws Exception
     {
-        super(session, (DestinationImpl) topic, messageSelector, noLocal, 
subscriptionName);
+        super(session, (DestinationImpl) topic, messageSelector, noLocal, 
subscriptionName,consumerTag);
     }
 
     //---  javax.jms.TopicSubscriber interface

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
 Thu Aug 23 21:06:18 2007
@@ -70,6 +70,17 @@
     protected BytesMessageImpl(org.apache.qpidity.api.Message message) throws 
QpidException
     {
         super(message);
+        try
+        {
+            ByteBuffer b = message.readData();
+            byte[] a = new byte[b.limit()];
+            b.get(a);
+            _dataIn = new DataInputStream(new ByteArrayInputStream(a));
+        }
+        catch(Exception e)
+        {
+            e.printStackTrace();
+        }
     }
 
     //--- BytesMessage API

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
 Thu Aug 23 21:06:18 2007
@@ -64,6 +64,7 @@
     {
         // We us a byteBufferMessage as default
         _qpidityMessage = new ByteBufferMessage();
+        System.out.println("Creating a bytes message");
         _messageProperties = new HashMap<String, Object>();
         // This is a newly created messsage so the data is empty
         _messageData = ByteBuffer.allocate(1024);
@@ -325,8 +326,8 @@
      * @param messageBody The buffer containing this message data
      */
     protected void setMessageData(ByteBuffer messageBody)
-    {
-        _messageData = messageBody;
+    {        
+        _messageData = messageBody.duplicate();
     }
 
     /**
@@ -389,7 +390,11 @@
             // set the message data
             _qpidityMessage.clearData();
             // we need to do a flip
-            _messageData.flip();
+            //_messageData.flip();
+            
+            System.out.println("_messageData POS " + _messageData.position());
+            System.out.println("_messageData limit " + _messageData.limit());
+            
             _qpidityMessage.appendData(_messageData);
             
_qpidityMessage.getMessageProperties().setApplicationHeaders(_messageProperties);
         }


Reply via email to