Author: arnaudsimon
Date: Fri Aug 24 01:45:43 2007
New Revision: 569298
URL: http://svn.apache.org/viewvc?rev=569298&view=rev
Log: (empty)
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java?rev=569298&r1=569297&r2=569298&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
Fri Aug 24 01:45:43 2007
@@ -96,7 +96,7 @@
* Nether exceed MAX_MESSAGE_TRANSFERRED
*/
private int _messageAsyncrhonouslyReceived = 0;
-
+
private AtomicBoolean _messageReceived = new AtomicBoolean();
//----- Constructors
@@ -109,12 +109,13 @@
* @param noLocal If true inhibits the delivery of messages
published by its own connection.
* @param subscriptionName Name of the subscription if this is to be
created as a durable subscriber.
* If this value is null, a non-durable
subscription is created.
+ * @param consumerTag This consumer ID.
* @throws Exception If the MessageProducerImpl cannot be created due to
some internal error.
*/
protected MessageConsumerImpl(SessionImpl session, DestinationImpl
destination, String messageSelector,
- boolean noLocal, String
subscriptionName,String consumerTag) throws Exception
+ boolean noLocal, String subscriptionName,
String consumerTag) throws Exception
{
- super(session, destination,consumerTag);
+ super(session, destination, consumerTag);
if (messageSelector != null)
{
_messageSelector = messageSelector;
@@ -183,13 +184,13 @@
// set the flow mode
getSession().getQpidSession()
.messageFlowMode(getMessageActorID(),
org.apache.qpidity.client.Session.MESSAGE_FLOW_MODE_CREDIT);
-
+
// this will prevent the broker from sending more than one message
// When a messageListener is set the flow will be adjusted.
// until then we assume it's for synchronous message consumption
getSession().getQpidSession()
- .messageFlow(getMessageActorID(),
org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,1);
-
+ .messageFlow(getMessageActorID(),
org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+
getSession().getQpidSession().sync();
// check for an exception
if (getSession().getCurrentException() != null)
@@ -355,12 +356,12 @@
{
checkNotClosed();
Message result = null;
-
+
if (_messageListener != null)
{
throw new javax.jms.IllegalStateException("A listener has already
been set.");
}
-
+
if (_incomingMessage != null)
{
System.out.println("We already had a message in the queue");
@@ -368,7 +369,7 @@
_incomingMessage = null;
return result;
}
-
+
synchronized (_incomingMessageLock)
{
// This indicate to the delivery thread to deliver the message to
this consumer
@@ -383,13 +384,13 @@
getSession().getQpidSession().messageFlush(getMessageActorID());
_messageReceived.set(false);
System.out.println("no message in the queue, issuing a flow(1)
and waiting for message");
-
+
//When sync() returns we know whether we have received a
message or not.
getSession().getQpidSession().sync();
-
- System.out.println("we got returned from sync()");
+
+ System.out.println("we got returned from sync()");
//received = getSession().getQpidSession().messagesReceived();
- }
+ }
if (_messageReceived.get() && timeout < 0)
{
// this is a nowait and we havent received a message then we
must immediatly return
@@ -406,7 +407,7 @@
{
try
{
- System.out.println("waiting for message");
+ System.out.println("waiting for message");
_incomingMessageLock.wait(timeout);
}
catch (InterruptedException e)
@@ -427,7 +428,7 @@
// We now release any message received for this consumer
_isReceiving = false;
_isNoWaitIsReceiving = false;
- getSession().testQpidException();
+ getSession().testQpidException();
}
return result;
}
@@ -500,7 +501,7 @@
if (_messageListener == null)
{
System.out.println("Received a message- onMessage in message
consumer Impl");
-
+
synchronized (_incomingMessageLock)
{
if (messageOk)
@@ -510,7 +511,7 @@
if (_isReceiving)
{
System.out.println("Received a message- onMessage
in message _isReceiving");
-
+
_incomingMessage = message;
_incomingMessageLock.notify();
}
@@ -534,11 +535,11 @@
org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
getSession().getQpidSession().messageFlush(getMessageActorID());
_messageReceived.set(false);
-
+
// When sync() returns we know whether we have
received a message or not.
- getSession().getQpidSession().sync();
-
- if (_messageReceived.get() &&
_isNoWaitIsReceiving)
+ getSession().getQpidSession().sync();
+
+ if (_messageReceived.get() && _isNoWaitIsReceiving)
{
// Right a message nowait is waiting for a
message
// but no one can be delivered it then need to
return
@@ -619,7 +620,7 @@
RangeSet ranges = new RangeSet();
ranges.add(message.getMessageTransferId());
getSession().getQpidSession().messageRelease(ranges);
- getSession().testQpidException();
+ getSession().testQpidException();
}
}
@@ -667,7 +668,7 @@
getSession().testQpidException();
}
}
-
+
public void notifyMessageReceived()
{
_messageReceived.set(true);
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java?rev=569298&r1=569297&r2=569298&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
Fri Aug 24 01:45:43 2007
@@ -70,17 +70,6 @@
protected BytesMessageImpl(org.apache.qpidity.api.Message message) throws
QpidException
{
super(message);
- try
- {
- ByteBuffer b = message.readData();
- byte[] a = new byte[b.limit()];
- b.get(a);
- _dataIn = new DataInputStream(new ByteArrayInputStream(a));
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
}
//--- BytesMessage API
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java?rev=569298&r1=569297&r2=569298&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
Fri Aug 24 01:45:43 2007
@@ -562,8 +562,8 @@
//-- Overwritten methods
/**
* This method is invoked before this message is dispatched.
- * <p>This class uses it to convert its text payload into a ByteBuffer
*/
+ @Override
public void beforeMessageDispatch() throws QpidException
{
try
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java?rev=569298&r1=569297&r2=569298&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
Fri Aug 24 01:45:43 2007
@@ -83,7 +83,7 @@
{
super(message);
}
-
+
//---- javax.jms.Message interface
/**
* Get the message ID.
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java?rev=569298&r1=569297&r2=569298&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
Fri Aug 24 01:45:43 2007
@@ -31,11 +31,18 @@
import org.apache.qpidity.QpidException;
import org.apache.qpidity.ReplyTo;
import org.apache.qpidity.client.util.ByteBufferMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class QpidMessage
{
/**
+ * this QpidMessage's logger
+ */
+ private static final Logger _logger =
LoggerFactory.getLogger(QpidMessage.class);
+
+ /**
* The underlying qpidity message
*/
private org.apache.qpidity.api.Message _qpidityMessage;
@@ -326,8 +333,8 @@
* @param messageBody The buffer containing this message data
*/
protected void setMessageData(ByteBuffer messageBody)
- {
- _messageData = messageBody.duplicate();
+ {
+ _messageData = messageBody; // we shouldn't need that .duplicate();
}
/**
@@ -388,13 +395,12 @@
try
{
// set the message data
- _qpidityMessage.clearData();
- // we need to do a flip
- //_messageData.flip();
-
- System.out.println("_messageData POS " + _messageData.position());
- System.out.println("_messageData limit " + _messageData.limit());
-
+ _qpidityMessage.clearData();
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("_messageData POS " + _messageData.position());
+ _logger.debug("_messageData limit " + _messageData.limit());
+ }
_qpidityMessage.appendData(_messageData);
_qpidityMessage.getMessageProperties().setApplicationHeaders(_messageProperties);
}