Author: arnaudsimon
Date: Fri Jan 4 05:50:19 2008
New Revision: 608840
URL: http://svn.apache.org/viewvc?rev=608840&view=rev
Log:
cashed headers: see QPID-720
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=608840&r1=608839&r2=608840&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
Fri Jan 4 05:50:19 2008
@@ -27,6 +27,7 @@
import org.apache.qpidity.njms.ExceptionHelper;
import org.apache.qpidity.nclient.util.ByteBufferMessage;
import org.apache.qpidity.transport.ReplyTo;
+import org.apache.qpidity.transport.DeliveryProperties;
import javax.jms.Message;
import javax.jms.JMSException;
@@ -80,30 +81,56 @@
}
}
+ DeliveryProperties deliveryProp =
message.get010Message().getDeliveryProperties();
// set the delivery properties
if (!_disableTimestamps)
{
final long currentTime = System.currentTimeMillis();
-
message.get010Message().getDeliveryProperties().setTimestamp(currentTime);
+ deliveryProp.setTimestamp(currentTime);
if (timeToLive > 0)
{
-
message.get010Message().getDeliveryProperties().setExpiration(currentTime +
timeToLive);
+ deliveryProp.setExpiration(currentTime + timeToLive);
+ message.setJMSExpiration(currentTime + timeToLive);
}
else
{
-
message.get010Message().getDeliveryProperties().setExpiration(0);
+ deliveryProp.setExpiration(0);
+ message.setJMSExpiration(0);
}
-
origMessage.setJMSTimestamp(message.get010Message().getDeliveryProperties().getTimestamp());
+ message.setJMSTimestamp(currentTime);
}
- message.get010Message().getDeliveryProperties().setDeliveryMode((byte)
deliveryMode);
- message.get010Message().getDeliveryProperties().setPriority((byte)
priority);
-
message.get010Message().getDeliveryProperties().setExchange(destination.getExchangeName().toString());
-
message.get010Message().getDeliveryProperties().setRoutingKey(destination.getRoutingKey().toString());
-
origMessage.setJMSPriority(message.get010Message().getDeliveryProperties().getPriority());
-
origMessage.setJMSExpiration(message.get010Message().getDeliveryProperties().getExpiration());
- origMessage.setJMSMessageID(message.getJMSMessageID());
- origMessage.setJMSDeliveryMode(deliveryMode);
+ if (deliveryProp.getDeliveryMode() != deliveryMode)
+ {
+ deliveryProp.setDeliveryMode((byte) deliveryMode);
+ message.setJMSDeliveryMode(deliveryMode);
+ }
+ if (deliveryProp.getPriority() != priority)
+ {
+ deliveryProp.setPriority((byte) priority);
+ message.setJMSPriority(priority);
+ }
+ String excahngeName = destination.getExchangeName().toString();
+ if ( deliveryProp.getExchange() == null || !
deliveryProp.getExchange().equals(excahngeName))
+ {
+ deliveryProp.setExchange(excahngeName);
+ }
+ String routingKey = destination.getRoutingKey().toString();
+ if (deliveryProp.getRoutingKey() == null || !
deliveryProp.getRoutingKey().equals(routingKey))
+ {
+ deliveryProp.setRoutingKey(routingKey);
+ }
+
+ if (message != origMessage)
+ {
+ _logger.debug("Updating original message");
+ origMessage.setJMSPriority(message.getJMSPriority());
+ origMessage.setJMSTimestamp(message.getJMSTimestamp());
+ _logger.debug("Setting JMSExpiration:" +
message.getJMSExpiration());
+ origMessage.setJMSExpiration(message.getJMSExpiration());
+ origMessage.setJMSMessageID(message.getJMSMessageID());
+ origMessage.setJMSDeliveryMode(deliveryMode);
+ }
BasicContentHeaderProperties contentHeaderProperties =
message.getContentHeaderProperties();
if (contentHeaderProperties.reset())
{
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java?rev=608840&r1=608839&r2=608840&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
Fri Jan 4 05:50:19 2008
@@ -186,7 +186,7 @@
* @see org.apache.qpidity.transport.DeliveryProperties
* @see org.apache.qpidity.transport.MessageProperties
*/
- public void header(Struct... headers);
+ public Header header(Struct... headers);
/**
* Add the following byte array to the content of the message being sent.
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java?rev=608840&r1=608839&r2=608840&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
Fri Jan 4 05:50:19 2008
@@ -74,7 +74,17 @@
// therefore reading the content in one shot.
ByteBuffer data = msg.readData();
super.messageTransfer(destination, confirmMode, acquireMode);
- super.header(msg.getDeliveryProperties(),msg.getMessageProperties());
+ // super.header(msg.getDeliveryProperties(),msg.getMessageProperties()
);
+ if( msg.getHeader() == null || msg.getDeliveryProperties().isDirty()
|| msg.getMessageProperties().isDirty() )
+ {
+ msg.setHeader(
super.header(msg.getDeliveryProperties(),msg.getMessageProperties()) );
+ msg.getDeliveryProperties().setDirty(false);
+ msg.getMessageProperties().setDirty(false);
+ }
+ else
+ {
+ super.header(msg.getHeader());
+ }
data( data );
endData();
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java?rev=608840&r1=608839&r2=608840&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java
Fri Jan 4 05:50:19 2008
@@ -7,6 +7,7 @@
import org.apache.qpidity.transport.DeliveryProperties;
import org.apache.qpidity.transport.MessageProperties;
+import org.apache.qpidity.transport.Header;
import org.apache.qpidity.api.Message;
/**
@@ -27,6 +28,15 @@
private DeliveryProperties _currentDeliveryProps;
private MessageProperties _currentMessageProps;
private long _transferId;
+ private Header _header;
+
+ public void setHeader(Header header) {
+ _header = header;
+ }
+
+ public Header getHeader() {
+ return _header;
+ }
public ByteBufferMessage()
{
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java?rev=608840&r1=608839&r2=608840&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java
Fri Jan 4 05:50:19 2008
@@ -9,6 +9,7 @@
import org.apache.qpidity.transport.DeliveryProperties;
import org.apache.qpidity.transport.MessageProperties;
+import org.apache.qpidity.transport.Header;
import org.apache.qpidity.api.Message;
/**
@@ -50,6 +51,14 @@
{
_chunkSize = (int)_fileSize;
}
+ }
+
+ public void setHeader(Header header) {
+ //To change body of implemented methods use File | Settings | File
Templates.
+ }
+
+ public Header getHeader() {
+ return null; //To change body of implemented methods use File |
Settings | File Templates.
}
public void readData(byte[] target) throws IOException
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java?rev=608840&r1=608839&r2=608840&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java
Fri Jan 4 05:50:19 2008
@@ -7,6 +7,7 @@
import org.apache.qpidity.transport.DeliveryProperties;
import org.apache.qpidity.transport.MessageProperties;
+import org.apache.qpidity.transport.Header;
import org.apache.qpidity.api.Message;
public class StreamingMessage extends ReadOnlyMessage implements Message
@@ -14,7 +15,15 @@
SocketChannel _socChannel;
private int _chunkSize;
private ByteBuffer _readBuf;
-
+
+ public Header getHeader() {
+ return null; //To change body of implemented methods use File |
Settings | File Templates.
+ }
+
+ public void setHeader(Header header) {
+ //To change body of implemented methods use File | Settings | File
Templates.
+ }
+
public StreamingMessage(SocketChannel in,int chunkSize,DeliveryProperties
deliveryProperties,MessageProperties messageProperties)throws IOException
{
_messageProperties = messageProperties;