Author: rajith
Date: Thu Aug 23 21:06:18 2007
New Revision: 569238
URL: http://svn.apache.org/viewvc?rev=569238&view=rev
Log:
Fixed the following issues
1) TopicImpl doesn't populate the routing key properly.
The Destination Impl needs to have a routing key field (I added the
field).
For Topic The queue name is generated.
For Queue the routingkey is same as queue name.
2) QpidMessage - Calling flip on messageData resets the limit to zero in
beforeMessageDispatch(). I commented out the flip()
3) QpidMessage - setMessageData
Instead of _messageData = messageBody, I modified it to do
_messageData = messageBody.duplicate();
4) MessageActorId is not set properly - so I modified the code to set
this. This id is used for the destination
5) When creating BytesMessageImpl, in the constructor, it doesn't read from
the underlying
message impl. There for the _readIn is null and results in
MessageNotReadableException.
I added a temp solution to read and populate _readIn.
However need to revisit it later
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java
Thu Aug 23 21:06:18 2007
@@ -108,7 +108,6 @@
ClientSession ssn = new ClientSession();
ssn.attach(ch);
ssn.sessionOpen(expiryInSeconds);
-
return ssn;
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
Thu Aug 23 21:06:18 2007
@@ -29,7 +29,7 @@
{
for (long l = range.getLower(); l <= range.getUpper(); l++)
{
- System.out.println("Acknowleding message for : " +
super.getCommand((int) l));
+ System.out.println("Acknowleding transfer id : " + l);
super.processed(l);
}
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
Thu Aug 23 21:06:18 2007
@@ -67,10 +67,12 @@
}
((ClientSession)session).setRejectedMessages(struct.getTransfers());
((ClientSession)session).notifyException(new QpidException("Message
Rejected",ErrorCode.MESSAGE_REJECTED,null));
+ session.processed(struct);
}
@Override public void messageAcquired(Session session, MessageAcquired
struct)
{
((ClientSession)session).setAccquiredMessages(struct.getTransfers());
+ session.processed(struct);
}
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
Thu Aug 23 21:06:18 2007
@@ -30,7 +30,8 @@
public ByteBufferMessage()
{
-
+ _currentDeliveryProps = new DeliveryProperties();
+ _currentMessageProps = new MessageProperties();
}
public ByteBufferMessage(long transferId)
@@ -70,6 +71,7 @@
public MessageProperties getMessageProperties()
{
+ System.out.println("MessageProperties is null ? " +
_currentMessageProps == null? "true":"false");
return _currentMessageProps;
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java
Thu Aug 23 21:06:18 2007
@@ -68,6 +68,8 @@
* Indicates whether this destination is durable
*/
protected boolean _isDurable;
+
+ protected String _routingKey;
/**
* The biding URL used to create this destiantion
@@ -79,6 +81,7 @@
protected DestinationImpl(String name) throws QpidException
{
_queueName = name;
+ _routingKey = name;
}
/**
@@ -96,6 +99,7 @@
_isAutoDelete =
Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE));
_isDurable =
Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE));
_queueName = binding.getQueueName();
+ _routingKey = binding.getQueueName();
_url = binding;
}
@@ -171,6 +175,11 @@
return _isAutoDelete;
}
+ public String getRoutingKey()
+ {
+ return _routingKey;
+ }
+
/**
* Indicates whether this destination is Durable.
*
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java
Thu Aug 23 21:06:18 2007
@@ -63,15 +63,16 @@
//TODO define the parameters
- protected MessageActor()
+ protected MessageActor(String messageActorID)
{
-
+ _messageActorID = messageActorID;
}
- protected MessageActor(SessionImpl session, DestinationImpl destination)
+ protected MessageActor(SessionImpl session, DestinationImpl
destination,String messageActorID)
{
_session = session;
_destination = destination;
+ _messageActorID = messageActorID;
}
//--- public methods (part of the jms public API)
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
Thu Aug 23 21:06:18 2007
@@ -112,9 +112,9 @@
* @throws Exception If the MessageProducerImpl cannot be created due to
some internal error.
*/
protected MessageConsumerImpl(SessionImpl session, DestinationImpl
destination, String messageSelector,
- boolean noLocal, String subscriptionName)
throws Exception
+ boolean noLocal, String
subscriptionName,String consumerTag) throws Exception
{
- super(session, destination);
+ super(session, destination,consumerTag);
if (messageSelector != null)
{
_messageSelector = messageSelector;
@@ -167,7 +167,7 @@
}
// bind this queue with the topic exchange
getSession().getQpidSession()
- .queueBind(queueName,
ExchangeDefaults.TOPIC_EXCHANGE_NAME, destination.getQpidQueueName(), null);
+ .queueBind(queueName,
ExchangeDefaults.TOPIC_EXCHANGE_NAME, destination.getRoutingKey(), null);
// subscribe to this topic
getSession().getQpidSession()
.messageSubscribe(queueName, getMessageActorID(),
@@ -183,6 +183,13 @@
// set the flow mode
getSession().getQpidSession()
.messageFlowMode(getMessageActorID(),
org.apache.qpidity.client.Session.MESSAGE_FLOW_MODE_CREDIT);
+
+ // this will prevent the broker from sending more than one message
+ // When a messageListener is set the flow will be adjusted.
+ // until then we assume it's for synchronous message consumption
+ getSession().getQpidSession()
+ .messageFlow(getMessageActorID(),
org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,1);
+
getSession().getQpidSession().sync();
// check for an exception
if (getSession().getCurrentException() != null)
@@ -347,12 +354,21 @@
private Message internalReceive(long timeout) throws Exception
{
checkNotClosed();
+ Message result = null;
+
if (_messageListener != null)
{
throw new javax.jms.IllegalStateException("A listener has already
been set.");
}
-
- Message result = null;
+
+ if (_incomingMessage != null)
+ {
+ System.out.println("We already had a message in the queue");
+ result = (Message) _incomingMessage;
+ _incomingMessage = null;
+ return result;
+ }
+
synchronized (_incomingMessageLock)
{
// This indicate to the delivery thread to deliver the message to
this consumer
@@ -366,11 +382,14 @@
1);
getSession().getQpidSession().messageFlush(getMessageActorID());
_messageReceived.set(false);
-
+ System.out.println("no message in the queue, issuing a flow(1)
and waiting for message");
+
//When sync() returns we know whether we have received a
message or not.
getSession().getQpidSession().sync();
+
+ System.out.println("we got returned from sync()");
//received = getSession().getQpidSession().messagesReceived();
- }
+ }
if (_messageReceived.get() && timeout < 0)
{
// this is a nowait and we havent received a message then we
must immediatly return
@@ -387,6 +406,7 @@
{
try
{
+ System.out.println("waiting for message");
_incomingMessageLock.wait(timeout);
}
catch (InterruptedException e)
@@ -479,18 +499,24 @@
// notify the waiting thread
if (_messageListener == null)
{
+ System.out.println("Received a message- onMessage in message
consumer Impl");
+
synchronized (_incomingMessageLock)
{
if (messageOk)
{
+ System.out.println("Received a message- onMessage in
message ok " + messageOk);
// we have received a proper message that we can
deliver
if (_isReceiving)
{
+ System.out.println("Received a message- onMessage
in message _isReceiving");
+
_incomingMessage = message;
_incomingMessageLock.notify();
}
else
{
+ System.out.println("Received a message- onMessage
in message releasing");
// this message has been received after a received
as returned
// we need to release it
releaseMessage(message);
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
Thu Aug 23 21:06:18 2007
@@ -58,7 +58,7 @@
//-- constructors
public MessageProducerImpl(SessionImpl session, DestinationImpl
destination)
{
- super(session, destination);
+ super(session, destination,"");
}
//--- Interface javax.jms.MessageProducer
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
Thu Aug 23 21:06:18 2007
@@ -17,10 +17,13 @@
*/
package org.apache.qpidity.jms;
-import org.apache.qpidity.jms.message.QpidMessage;
-import org.apache.qpidity.jms.message.MessageFactory;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
import org.apache.qpidity.api.Message;
import org.apache.qpidity.client.util.MessageListener;
+import org.apache.qpidity.jms.message.MessageFactory;
+import org.apache.qpidity.jms.message.QpidMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,8 +32,12 @@
* This is for guarantying that asynch messages are sequentially processed
within their session.
* <p> when used synchonously, messages are dispatched to the receiver itself.
*/
-public class QpidMessageListener implements MessageListener
+public class QpidMessageListener implements MessageListener, Runnable
{
+
+ // temp solution
+ LinkedBlockingQueue<Message> _queue = new LinkedBlockingQueue<Message>();
+
/**
* Used for debugging.
*/
@@ -50,37 +57,53 @@
public QpidMessageListener(MessageConsumerImpl consumer)
{
_consumer = consumer;
+ Thread t = new Thread(this);
+ t.start();
}
-
- //---- org.apache.qpidity.MessagePartListener API
- /**
- * Deliver a message to the listener.
- *
- * @param message The message delivered to the listner.
- */
- public void onMessage(Message message)
+
+ public void run()
{
try
{
- // to be used with flush
- _consumer.notifyMessageReceived();
-
- //convert this message into a JMS one
- QpidMessage jmsMessage = MessageFactory.getQpidMessage(message);
- // if consumer is asynchronous then send this message to its
session.
- if( _consumer.getMessageListener() != null )
+ while(true)
{
-
_consumer.getSession().dispatchMessage(_consumer.getMessageActorID(),
jmsMessage);
- }
- else
- {
- // deliver this message to the consumer itself
- _consumer.onMessage(jmsMessage);
+ System.out.println("trying to take a message message");
+ Message message = _queue.take();
+
+ // to be used with flush
+ System.out.println("processing the message");
+ _consumer.notifyMessageReceived();
+
+ //convert this message into a JMS one
+ QpidMessage jmsMessage =
MessageFactory.getQpidMessage(message);
+ // if consumer is asynchronous then send this message to its
session.
+ if( _consumer.getMessageListener() != null )
+ {
+
_consumer.getSession().dispatchMessage(_consumer.getMessageActorID(),
jmsMessage);
+ }
+ else
+ {
+ // deliver this message to the consumer itself
+ _consumer.onMessage(jmsMessage);
+ }
}
}
catch (Exception e)
{
throw new RuntimeException(e.getMessage());
}
+ }
+
+ //---- org.apache.qpidity.MessagePartListener API
+ /**
+ * Deliver a message to the listener.
+ *
+ * @param message The message delivered to the listner.
+ */
+ public void onMessage(Message message)
+ {
+ System.out.println("Received a message");
+ _queue.offer(message);
+ System.out.println("Added queue to the message");
}
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
Thu Aug 23 21:06:18 2007
@@ -80,9 +80,9 @@
* @param messageSelector only messages with properties matching the
message selector expression are delivered.
* @throws Exception In case of internal problem when creating this
browser.
*/
- protected QueueBrowserImpl(SessionImpl session, Queue queue, String
messageSelector) throws Exception
+ protected QueueBrowserImpl(SessionImpl session, Queue queue, String
messageSelector,String consumerTag) throws Exception
{
- super(session, (DestinationImpl) queue);
+ super(session, (DestinationImpl) queue,consumerTag);
// this is an array representing a batch of messages for this browser.
_messages = new Message[_maxbatchlength];
if (messageSelector != null)
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java
Thu Aug 23 21:06:18 2007
@@ -35,9 +35,9 @@
* @param messageSelector the message selector for this QueueReceiverImpl.
* @throws Exception If the QueueReceiverImpl cannot be created due to
some internal error.
*/
- protected QueueReceiverImpl(SessionImpl session, Queue queue, String
messageSelector) throws Exception
+ protected QueueReceiverImpl(SessionImpl session, Queue queue, String
messageSelector,String consumerTag) throws Exception
{
- super(session, (DestinationImpl) queue, messageSelector, false, null);
+ super(session, (DestinationImpl) queue, messageSelector, false,
null,consumerTag);
}
//--- Interface QueueReceiver
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java
Thu Aug 23 21:06:18 2007
@@ -128,7 +128,7 @@
QueueReceiver receiver;
try
{
- receiver = new QueueReceiverImpl(this, queue, messageSelector);
+ receiver = new QueueReceiverImpl(this, queue,
messageSelector,String.valueOf(_consumerTag.incrementAndGet()));
}
catch (Exception e)
{
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
Thu Aug 23 21:06:18 2007
@@ -25,12 +25,11 @@
import javax.jms.*;
import javax.jms.IllegalStateException;
-import javax.jms.MessageListener;
-import javax.jms.Session;
import java.io.Serializable;
import java.util.LinkedList;
import java.util.HashMap;
import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Implementation of the JMS Session interface
@@ -123,6 +122,12 @@
* This session connection
*/
private ConnectionImpl _connection;
+
+ /**
+ * This will be used as the message actor id
+ * This in turn will be set as the destination
+ */
+ protected AtomicInteger _consumerTag = new AtomicInteger();
//--- Constructor
/**
@@ -594,7 +599,7 @@
MessageConsumerImpl consumer;
try
{
- consumer = new MessageConsumerImpl(this, (DestinationImpl)
destination, messageSelector, noLocal, null);
+ consumer = new MessageConsumerImpl(this, (DestinationImpl)
destination, messageSelector, noLocal,
null,String.valueOf(_consumerTag.incrementAndGet()));
}
catch (Exception e)
{
@@ -721,7 +726,7 @@
try
{
subscriber = new TopicSubscriberImpl(this, topic, messageSelector,
noLocal,
- _connection.getClientID() +
":" + name);
+ _connection.getClientID() +
":" + name,String.valueOf(_consumerTag.incrementAndGet()));
}
catch (Exception e)
{
@@ -765,7 +770,7 @@
QueueBrowserImpl browser;
try
{
- browser = new QueueBrowserImpl(this, queue, messageSelector);
+ browser = new QueueBrowserImpl(this, queue,
messageSelector,String.valueOf(_consumerTag.incrementAndGet()));
}
catch (Exception e)
{
@@ -1114,7 +1119,7 @@
*/
protected void testQpidException() throws QpidException
{
- _qpidSession.sync();
+ //_qpidSession.sync();
QpidException qe = getCurrentException();
if (qe != null)
{
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java
Thu Aug 23 21:06:18 2007
@@ -42,6 +42,7 @@
{
super(name);
_queueName = "Topic-" + UUID.randomUUID();
+ _routingKey = name;
_destinationName = name;
_exchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
_exchangeType = ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
@@ -61,6 +62,7 @@
{
super(name);
_queueName = "Topic-" + UUID.randomUUID();
+ _routingKey = name;
_destinationName = name;
_exchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
_exchangeType = ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
@@ -116,7 +118,11 @@
// test if this exchange exist on the broker
session.getQpidSession().exchangeDeclare(_exchangeName, _exchangeType,
null, null, Option.PASSIVE);
// wait for the broker response
+ System.out.println("Checking for exchange");
+
session.getQpidSession().sync();
+
+ System.out.println("Calling sync()");
// todo get the exception
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java
Thu Aug 23 21:06:18 2007
@@ -144,7 +144,7 @@
TopicSubscriber topicSubscriber;
try
{
- topicSubscriber = new TopicSubscriberImpl(this, topic,
messageSelector, noLocal, null);
+ topicSubscriber = new TopicSubscriberImpl(this, topic,
messageSelector, noLocal, null,String.valueOf(_consumerTag.incrementAndGet()));
}
catch (Exception e)
{
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java
Thu Aug 23 21:06:18 2007
@@ -39,9 +39,9 @@
* @throws Exception If the TopicSubscriberImpl cannot be created due to
internal error.
*/
protected TopicSubscriberImpl(SessionImpl session, Topic topic, String
messageSelector, boolean noLocal,
- String subscriptionName) throws Exception
+ String subscriptionName,String consumerTag)
throws Exception
{
- super(session, (DestinationImpl) topic, messageSelector, noLocal,
subscriptionName);
+ super(session, (DestinationImpl) topic, messageSelector, noLocal,
subscriptionName,consumerTag);
}
//--- javax.jms.TopicSubscriber interface
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
Thu Aug 23 21:06:18 2007
@@ -70,6 +70,17 @@
protected BytesMessageImpl(org.apache.qpidity.api.Message message) throws
QpidException
{
super(message);
+ try
+ {
+ ByteBuffer b = message.readData();
+ byte[] a = new byte[b.limit()];
+ b.get(a);
+ _dataIn = new DataInputStream(new ByteArrayInputStream(a));
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
}
//--- BytesMessage API
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java?rev=569238&r1=569237&r2=569238&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
Thu Aug 23 21:06:18 2007
@@ -64,6 +64,7 @@
{
// We us a byteBufferMessage as default
_qpidityMessage = new ByteBufferMessage();
+ System.out.println("Creating a bytes message");
_messageProperties = new HashMap<String, Object>();
// This is a newly created messsage so the data is empty
_messageData = ByteBuffer.allocate(1024);
@@ -325,8 +326,8 @@
* @param messageBody The buffer containing this message data
*/
protected void setMessageData(ByteBuffer messageBody)
- {
- _messageData = messageBody;
+ {
+ _messageData = messageBody.duplicate();
}
/**
@@ -389,7 +390,11 @@
// set the message data
_qpidityMessage.clearData();
// we need to do a flip
- _messageData.flip();
+ //_messageData.flip();
+
+ System.out.println("_messageData POS " + _messageData.position());
+ System.out.println("_messageData limit " + _messageData.limit());
+
_qpidityMessage.appendData(_messageData);
_qpidityMessage.getMessageProperties().setApplicationHeaders(_messageProperties);
}