Author: kpvdr
Date: Thu Feb 15 08:35:43 2007
New Revision: 507993
URL: http://svn.apache.org/viewvc?view=rev&rev=507993
Log:
Fix for RecoverTest which was failing because the redelivered flag was
disconnected
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=507993&r1=507992&r2=507993
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Thu Feb 15 08:35:43 2007
@@ -395,6 +395,7 @@
{
MessageTransferBody mtb = msg.getTransferBody().copy();
mtb.destination = destination;
+ mtb.redelivered = msg.isRedelivered();
ByteBuffer buf = ByteBuffer.allocate((int)msg.getBodySize());
for (ByteBuffer bb : msg.getContents())
{
@@ -441,6 +442,7 @@
{
MessageTransferBody mtb = msg.getTransferBody().copy();
mtb.destination = destination;
+ mtb.redelivered = msg.isRedelivered();
mtb.body = new Content(Content.TypeEnum.REF_T, refId);
_session.writeRequest(_channelId, mtb, listener);
for (ByteBuffer bb : msg.getContents())
Modified:
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=507993&r1=507992&r2=507993
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Thu Feb 15 08:35:43 2007
@@ -550,7 +550,7 @@
try
{
AbstractJMSMessage jmsMessage =
_messageFactory.createMessage(messageFrame.getDeliveryTag(),
-
false,
+
messageFrame.getRedeliveredFlag(),
messageFrame.getMessageHeaders(),
messageFrame.getContents());
Modified:
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java?view=diff&rev=507993&r1=507992&r2=507993
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
Thu Feb 15 08:35:43 2007
@@ -68,7 +68,7 @@
messageHeaders.setDeliveryMode(transferBody.getDeliveryMode());
messageHeaders.setJMSHeaders(transferBody.getApplicationHeaders());
- final UnprocessedMessage msg = new
UnprocessedMessage(evt.getChannelId(), evt.getRequestId(), messageHeaders);
+ final UnprocessedMessage msg = new
UnprocessedMessage(evt.getChannelId(), evt.getRequestId(), messageHeaders,
transferBody.getRedelivered());
if(transferBody.getBody().getContentType() ==
Content.TypeEnum.INLINE_T)
{
@@ -78,7 +78,7 @@
else
{
String referenceId = new
String(transferBody.getBody().getContentAsByteArray());
-
protocolSession.deliverMessageToAMQSession(evt.getChannelId(),referenceId);
+ protocolSession.deliverMessageToAMQSession(evt.getChannelId(),
referenceId);
}
}
Modified:
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java?view=diff&rev=507993&r1=507992&r2=507993
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
Thu Feb 15 08:35:43 2007
@@ -37,20 +37,23 @@
private int channelId;
private List<byte[]> contents = new LinkedList();
private long deliveryTag;
+ private boolean redeliveredFlag;
private MessageHeaders messageHeaders;
- public UnprocessedMessage(int channelId, long deliveryTag, MessageHeaders
messageHeaders)
+ public UnprocessedMessage(int channelId, long deliveryTag, MessageHeaders
messageHeaders, boolean redeliveredFlag)
{
this.channelId = channelId;
this.deliveryTag = deliveryTag;
this.messageHeaders = messageHeaders;
+ this.redeliveredFlag = redeliveredFlag;
}
- public UnprocessedMessage(int channelId, long deliveryTag, MessageHeaders
messageHeaders, byte[] content)
+ public UnprocessedMessage(int channelId, long deliveryTag, MessageHeaders
messageHeaders, byte[] content, boolean redeliveredFlag)
{
this.channelId = channelId;
this.deliveryTag = deliveryTag;
this.messageHeaders = messageHeaders;
+ this.redeliveredFlag = redeliveredFlag;
addContent(content);
}
@@ -78,6 +81,11 @@
public long getDeliveryTag()
{
return deliveryTag;
+ }
+
+ public boolean getRedeliveredFlag()
+ {
+ return redeliveredFlag;
}
public MessageHeaders getMessageHeaders()