Author: arnaudsimon
Date: Wed Aug 15 07:29:56 2007
New Revision: 566187
URL: http://svn.apache.org/viewvc?view=rev&rev=566187
Log:
Added foreign message support
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageHelper.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/StreamMessageImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java?view=diff&rev=566187&r1=566186&r2=566187
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
Wed Aug 15 07:29:56 2007
@@ -27,7 +27,12 @@
private DeliveryProperties _currentDeliveryProps;
private MessageProperties _currentMessageProps;
private long _transferId;
-
+
+ public ByteBufferMessage()
+ {
+
+ }
+
public ByteBufferMessage(long transferId)
{
_transferId = transferId;
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java?view=diff&rev=566187&r1=566186&r2=566187
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
Wed Aug 15 07:29:56 2007
@@ -17,7 +17,13 @@
*/
package org.apache.qpidity.jms;
+import org.apache.qpidity.jms.message.QpidMessage;
+import org.apache.qpidity.jms.message.MessageHelper;
+import org.apache.qpidity.jms.message.MessageImpl;
+import org.apache.qpidity.QpidException;
+
import javax.jms.*;
+import java.util.UUID;
/**
* Implements MessageProducer
@@ -304,13 +310,63 @@
{
throw new IllegalArgumentException("Time to live must be
non-negative - supplied value was " + timeToLive);
}
+ // Only get current time if required
+ long currentTime = Long.MIN_VALUE;
+ if (!((timeToLive == 0) && _disableTimestamps))
+ {
+ currentTime = System.currentTimeMillis();
+ }
+ // the messae UID
+ String uid = (_disableMessageId) ? "MSG_ID_DISABLED" :
UUID.randomUUID().toString();
+ MessageImpl qpidMessage = null;
// check that the message is not a foreign one
- // todo
- // set the properties
-
- // todo
-
- // dispatch it
+ try
+ {
+ qpidMessage = (MessageImpl) message;
+ }
+ catch (ClassCastException cce)
+ {
+ // this is a foreign message
+ qpidMessage = MessageHelper.transformMessage(message);
+ // set message's properties in case they are queried after send.
+ message.setJMSDestination(destination);
+ message.setJMSDeliveryMode(deliveryMode);
+ message.setJMSPriority(priority);
+ message.setJMSMessageID(uid);
+ if (timeToLive != 0)
+ {
+ message.setJMSExpiration(timeToLive + currentTime);
+ _logger.debug("Setting JMSExpiration:" +
message.getJMSExpiration());
+ }
+ else
+ {
+ message.setJMSExpiration(timeToLive);
+ }
+ message.setJMSTimestamp(currentTime);
+ }
+ // set the message properties
+ qpidMessage.setJMSDestination(destination);
+ qpidMessage.setJMSMessageID(uid);
+ qpidMessage.setJMSDeliveryMode(deliveryMode);
+ qpidMessage.setJMSPriority(priority);
+ if (timeToLive != 0)
+ {
+ qpidMessage.setJMSExpiration(timeToLive + currentTime);
+ }
+ else
+ {
+ qpidMessage.setJMSExpiration(timeToLive);
+ }
+ qpidMessage.setJMSTimestamp(currentTime);
+ // call beforeMessageDispatch
+ try
+ {
+ qpidMessage.beforeMessageDispatch();
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
// todo
getSession().getQpidSession().messageTransfer(((DestinationImpl)
destination).getExchangeName(), message, Option);
}
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java?view=diff&rev=566187&r1=566186&r2=566187
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
Wed Aug 15 07:29:56 2007
@@ -22,7 +22,6 @@
import org.apache.qpidity.jms.message.*;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.RangeSet;
-import org.apache.qpid.client.message.*;
import javax.jms.*;
import javax.jms.IllegalStateException;
@@ -435,7 +434,6 @@
// TODO: messageID is a string but range need a long???
// ranges.add(message.getMessageID());
getQpidSession().messageRelease(ranges);
- // TODO We can be a little bit cleverer and build a set of ranges
}
}
@@ -1040,7 +1038,6 @@
// TODO: messageID is a string but range need a long???
// ranges.add(message.getMessageID());
getQpidSession().messageAcknowledge(ranges);
- // TODO We can be a little bit cleverer and build a set of
ranges
}
//empty the list of unack messages
_unacknowledgedMessages.clear();
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java?view=diff&rev=566187&r1=566186&r2=566187
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
Wed Aug 15 07:29:56 2007
@@ -65,8 +65,9 @@
* Constructor used by MessageFactory
*
* @param message The new qpid message.
+ * @throws QpidException In case of problem when receiving the message
body.
*/
- protected BytesMessageImpl(org.apache.qpidity.api.Message message)
+ protected BytesMessageImpl(org.apache.qpidity.api.Message message) throws
QpidException
{
super(message);
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java?view=diff&rev=566187&r1=566186&r2=566187
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
Wed Aug 15 07:29:56 2007
@@ -54,8 +54,9 @@
* Constructor used by MessageFactory
*
* @param message The new qpid message.
+ * @throws QpidException In case of IO problem when reading the received
message.
*/
- protected MapMessageImpl(org.apache.qpidity.api.Message message)
+ protected MapMessageImpl(org.apache.qpidity.api.Message message) throws
QpidException
{
super(message);
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageHelper.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageHelper.java?view=diff&rev=566187&r1=566186&r2=566187
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageHelper.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageHelper.java
Wed Aug 15 07:29:56 2007
@@ -17,13 +17,12 @@
*/
package org.apache.qpidity.jms.message;
-import javax.jms.JMSException;
-import javax.jms.MessageFormatException;
+import javax.jms.*;
import java.math.BigDecimal;
import java.math.BigInteger;
+import java.util.Enumeration;
/**
- *
* This is an helper class for performing data convertion
*/
public class MessageHelper
@@ -106,7 +105,7 @@
return result;
}
- /**
+ /**
* Convert an object into a int value
*
* @param obj The object that may contain int value
@@ -127,12 +126,12 @@
else
{
throw new MessageFormatException("int property type convertion
error",
- "Messasge property type
convertion error");
+ "Messasge property type
convertion error");
}
return result;
}
- /**
+ /**
* Convert an object into a long value
*
* @param obj The object that may contain long value
@@ -154,7 +153,7 @@
else
{
throw new MessageFormatException("long property type convertion
error",
- "Messasge property type
convertion error");
+ "Messasge property type
convertion error");
}
return result;
}
@@ -180,7 +179,7 @@
else
{
throw new MessageFormatException("float property type convertion
error",
- "Messasge property type
convertion error");
+ "Messasge property type
convertion error");
}
return result;
}
@@ -206,8 +205,8 @@
else
{
throw new MessageFormatException("double property type convertion
error",
- "Messasge property type
convertion error");
- }
+ "Messasge property type
convertion error");
+ }
return result;
}
@@ -223,17 +222,17 @@
char result;
if (obj instanceof Character)
{
- result = ((Character) obj).charValue();
+ result = (Character) obj;
}
else
{
throw new MessageFormatException("char property type convertion
error",
- "Messasge property type
convertion error");
+ "Messasge property type
convertion error");
}
return result;
}
- /**
+ /**
* Convert an object into a String value
*
* @param obj The object that may contain String value
@@ -259,7 +258,7 @@
* @param value object for inspection
* @return true if object represent Java primitive type; false otherwise
*/
- public static boolean isPrimitive(Object value) throws JMSException
+ public static boolean isPrimitive(Object value)
{
// Innocent till proven guilty
boolean isPrimitive = true;
@@ -269,4 +268,189 @@
}
return isPrimitive;
}
+
+ /**
+ * Transform a foreign message into an equivalent QPID representation.
+ *
+ * @param message The foreign message to be converted.
+ * @return A native message.
+ * @throws JMSException In case of problem when converting the message.
+ */
+ public static MessageImpl transformMessage(Message message) throws
JMSException
+ {
+ MessageImpl messageImpl;
+
+ if (message instanceof BytesMessage)
+ {
+ messageImpl = transformBytesMessage((BytesMessage) message);
+ }
+ else if (message instanceof MapMessage)
+ {
+ messageImpl = transformMapMessage((MapMessage) message);
+ }
+ else if (message instanceof ObjectMessage)
+ {
+ messageImpl = transformObjectMessage((ObjectMessage) message);
+ }
+ else if (message instanceof StreamMessage)
+ {
+ messageImpl = transformStreamMessage((StreamMessage) message);
+ }
+ else if (message instanceof TextMessage)
+ {
+ messageImpl = transformTextMessage((TextMessage) message);
+ }
+ else
+ {
+ messageImpl = new MessageImpl();
+ }
+ transformHeaderAndProperties(message, messageImpl);
+ return messageImpl;
+ }
+
+ //---- Private methods
+ /**
+ * Exposed JMS defined properties on converted message:
+ * JMSDestination - we don't set here
+ * JMSDeliveryMode - we don't set here
+ * JMSExpiration - we don't set here
+ * JMSPriority - we don't set here
+ * JMSMessageID - we don't set here
+ * JMSTimestamp - we don't set here
+ * JMSCorrelationID - set
+ * JMSReplyTo - set
+ * JMSType - set
+ * JMSRedlivered - we don't set here
+ *
+ * @param message The foreign message to be converted.
+ * @param nativeMsg A native Qpid message.
+ * @throws JMSException In case of problem when converting the message.
+ */
+ private static void transformHeaderAndProperties(Message message,
MessageImpl nativeMsg) throws JMSException
+ {
+ //Set the correlation ID
+ String correlationID = message.getJMSCorrelationID();
+ if (correlationID != null)
+ {
+ nativeMsg.setJMSCorrelationID(correlationID);
+ }
+ //Set JMS ReplyTo
+ if (message.getJMSReplyTo() != null)
+ {
+ nativeMsg.setJMSReplyTo(message.getJMSReplyTo());
+ }
+ //Set JMS type
+ String jmsType = message.getJMSType();
+ if (jmsType != null)
+ {
+ nativeMsg.setJMSType(jmsType);
+ }
+ // Sets all non-JMS defined properties on converted message
+ Enumeration propertyNames = message.getPropertyNames();
+ while (propertyNames.hasMoreElements())
+ {
+ String propertyName = String.valueOf(propertyNames.nextElement());
+ if (!propertyName.startsWith("JMSX_"))
+ {
+ Object value = message.getObjectProperty(propertyName);
+ nativeMsg.setObjectProperty(propertyName, value);
+ }
+ }
+ }
+
+ /**
+ * Transform a BytesMessage.
+ *
+ * @param bytesMessage a BytesMessage to be converted.
+ * @return a native BytesMessage.
+ * @throws JMSException In case of problem when converting the message.
+ */
+ private static BytesMessageImpl transformBytesMessage(BytesMessage
bytesMessage) throws JMSException
+ {
+ //reset the BytesMessage (makes the body read-only and repositions
+ // the stream of bytes to the beginning
+ bytesMessage.reset();
+ BytesMessageImpl nativeMsg = new BytesMessageImpl();
+ byte[] buf = new byte[1024];
+ int len;
+ while ((len = bytesMessage.readBytes(buf)) != -1)
+ {
+ nativeMsg.writeBytes(buf, 0, len);
+ }
+ return nativeMsg;
+ }
+
+ /**
+ * Transform a MapMessage.
+ *
+ * @param mapMessage a MapMessage to be converted.
+ * @return a native MapMessage.
+ * @throws JMSException In case of problem when converting the message.
+ */
+ private static MapMessageImpl transformMapMessage(MapMessage mapMessage)
throws JMSException
+ {
+ MapMessageImpl nativeMsg = new MapMessageImpl();
+ Enumeration mapNames = mapMessage.getMapNames();
+ while (mapNames.hasMoreElements())
+ {
+ String name = (String) mapNames.nextElement();
+ nativeMsg.setObject(name, mapMessage.getObject(name));
+ }
+ return nativeMsg;
+ }
+
+ /**
+ * Transform an ObjectMessage.
+ *
+ * @param objectMessage a ObjectMessage to be converted.
+ * @return a native ObjectMessage.
+ * @throws JMSException In case of problem when converting the message.
+ */
+ private static ObjectMessageImpl transformObjectMessage(ObjectMessage
objectMessage) throws JMSException
+ {
+ ObjectMessageImpl nativeMsg = new ObjectMessageImpl();
+ nativeMsg.setObject(objectMessage.getObject());
+ return nativeMsg;
+ }
+
+ /**
+ * Transform a StreamMessage.
+ *
+ * @param streamMessage a StreamMessage to be converted.
+ * @return a native StreamMessage.
+ * @throws JMSException In case of problem when converting the message.
+ */
+ private static StreamMessageImpl transformStreamMessage(StreamMessage
streamMessage) throws JMSException
+ {
+ StreamMessageImpl nativeMsg = new StreamMessageImpl();
+ try
+ {
+ //reset the stream message
+ streamMessage.reset();
+ while (true)
+ {
+ nativeMsg.writeObject(streamMessage.readObject());
+ }
+ }
+ catch (MessageEOFException e)
+ {
+ // we're at the end so don't mind the exception
+ }
+ return nativeMsg;
+ }
+
+ /**
+ * Transform a TextMessage.
+ *
+ * @param textMessage a TextMessage to be converted.
+ * @return a native TextMessage.
+ * @throws JMSException In case of problem when converting the message.
+ */
+ private static TextMessageImpl transformTextMessage(TextMessage
textMessage) throws JMSException
+ {
+ TextMessageImpl nativeMsg = new TextMessageImpl();
+ nativeMsg.setText(textMessage.getText());
+ return nativeMsg;
+ }
+
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java?view=diff&rev=566187&r1=566186&r2=566187
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
Wed Aug 15 07:29:56 2007
@@ -30,6 +30,11 @@
public class MessageImpl extends QpidMessage implements Message
{
/**
+ * name used to store JMSType.
+ */
+ private static final String JMS_MESSAGE_TYPE = "JMSType";
+
+ /**
* The ReplyTo destination for this message
*/
private Destination _replyTo;
@@ -72,8 +77,9 @@
* Constructor used by MessageFactory
*
* @param message The new qpid message.
+ * @throws QpidException In case of IO problem when reading the received
message.
*/
- protected MessageImpl(org.apache.qpidity.api.Message message)
+ protected MessageImpl(org.apache.qpidity.api.Message message) throws
QpidException
{
super(message);
}
@@ -411,7 +417,7 @@
*/
public String getJMSType() throws JMSException
{
- return super.getMessageType();
+ return getStringProperty(JMS_MESSAGE_TYPE);
}
/**
@@ -422,7 +428,14 @@
*/
public void setJMSType(String type) throws JMSException
{
- super.setMessageType(type);
+ if (type == null)
+ {
+ throw new JMSException("Invalid message type null");
+ }
+ else
+ {
+ super.setProperty(JMS_MESSAGE_TYPE, type);
+ }
}
/**
@@ -843,7 +856,7 @@
* Clear out the message body. Clearing a message's body does not clear
* its header values or property entries.
* <P>If this message body was read-only, calling this method leaves
- * the message body is in the same state as an empty body in a newly
+ * the message body in the same state as an empty body in a newly
* created message.
*
* @throws JMSException If clearing this message body fails to due to some
error.
@@ -864,14 +877,15 @@
{
if (_destination == null)
{
- throw new QpidException("Invalid destination null",null, null);
+ throw new QpidException("Invalid destination null", null, null);
}
+ super.beforeMessageDispatch();
}
/**
* This method is invoked after this message is received.
*
- * @throws QpidException
+ * @throws QpidException If there is an internal error when procesing this
message.
*/
public void afterMessageReceive() throws QpidException
{
@@ -882,7 +896,6 @@
_proertiesReadOnly = true;
_readOnly = true;
-
}
/**
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java?view=diff&rev=566187&r1=566186&r2=566187
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java
Wed Aug 15 07:29:56 2007
@@ -57,8 +57,9 @@
* Constructor used by MessageFactory
*
* @param message The new qpid message.
+ * @throws QpidException In case of IO problem when reading the received
message.
*/
- protected ObjectMessageImpl(org.apache.qpidity.api.Message message)
+ protected ObjectMessageImpl(org.apache.qpidity.api.Message message) throws
QpidException
{
super(message);
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java?view=diff&rev=566187&r1=566186&r2=566187
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
Wed Aug 15 07:29:56 2007
@@ -25,10 +25,12 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Vector;
+import java.io.IOException;
import org.apache.qpidity.ErrorCode;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.ReplyTo;
+import org.apache.qpidity.client.util.ByteBufferMessage;
public class QpidMessage
@@ -55,25 +57,36 @@
//-- Constructors
-
/**
* Constructor used when JMS messages are created by SessionImpl.
*/
protected QpidMessage()
{
- // TODO we need an implementation class: _qpidityMessage
+ // We us a byteBufferMessage as default
+ _qpidityMessage = new ByteBufferMessage();
_messageProperties = new HashMap<String, Object>();
+ // This is a newly created messsage so the data is empty
+ _messageData = ByteBuffer.allocate(1024);
}
/**
* Constructor used when a Qpid message is received
*
- * @param message The received message
+ * @param message The received message.
+ * @throws QpidException In case of problem when receiving the message
body.
*/
- protected QpidMessage(org.apache.qpidity.api.Message message)
+ protected QpidMessage(org.apache.qpidity.api.Message message) throws
QpidException
{
- _qpidityMessage = message;
- _messageProperties = (Map<String, Object>)
message.getMessageProperties().getApplicationHeaders();
+ try
+ {
+ _qpidityMessage = message;
+ _messageProperties = (Map<String, Object>)
message.getMessageProperties().getApplicationHeaders();
+ _messageData = _qpidityMessage.readData();
+ }
+ catch (IOException ioe)
+ {
+ throw new QpidException("IO problem when creating message",
ErrorCode.UNDEFINED, ioe);
+ }
}
//---- getters and setters.
@@ -148,6 +161,16 @@
}
/**
+ * Set the ReplyTo for this message.
+ *
+ * @param replyTo The ReplyTo for this message.
+ */
+ protected void setReplyTo(ReplyTo replyTo)
+ {
+ _qpidityMessage.getMessageProperties().setReplyTo(replyTo);
+ }
+
+ /**
* Get this message Delivery mode
* The delivery mode may be non-persistent (1) or persistent (2)
*
@@ -321,9 +344,30 @@
*/
protected void clearMessageData()
{
- _messageData = ByteBuffer.allocate(1024);
+ _messageData = ByteBuffer.allocate(1024);
}
+ /**
+ * This method is invoked before a message dispatch operation.
+ *
+ * @throws QpidException If the destination is not set
+ */
+ public void beforeMessageDispatch() throws QpidException
+ {
+ try
+ {
+ // set the message data
+ _qpidityMessage.clearData();
+ // we need to do a flip
+ _messageData.flip();
+ _qpidityMessage.appendData(_messageData);
+
_qpidityMessage.getMessageProperties().setApplicationHeaders(_messageProperties);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException("IO exception when sending message",
ErrorCode.UNDEFINED, e);
+ }
+ }
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/StreamMessageImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/StreamMessageImpl.java?view=diff&rev=566187&r1=566186&r2=566187
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/StreamMessageImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/StreamMessageImpl.java
Wed Aug 15 07:29:56 2007
@@ -17,6 +17,8 @@
*/
package org.apache.qpidity.jms.message;
+import org.apache.qpidity.QpidException;
+
import javax.jms.*;
import java.io.IOException;
import java.io.EOFException;
@@ -80,8 +82,9 @@
* Constructor used by MessageFactory
*
* @param message The new qpid message.
+ * @throws QpidException In case of problem when receiving the message
body.
*/
- protected StreamMessageImpl(org.apache.qpidity.api.Message message)
+ protected StreamMessageImpl(org.apache.qpidity.api.Message message) throws
QpidException
{
super(message);
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java?view=diff&rev=566187&r1=566186&r2=566187
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java
Wed Aug 15 07:29:56 2007
@@ -55,8 +55,9 @@
* Constructor used by MessageFactory
*
* @param message The new qpid message.
+ * @throws QpidException In case of IO problem when reading the received
message.
*/
- protected TextMessageImpl(org.apache.qpidity.api.Message message)
+ protected TextMessageImpl(org.apache.qpidity.api.Message message) throws
QpidException
{
super(message);
}