Author: rajith
Date: Thu Feb 15 15:16:38 2007
New Revision: 508233
URL: http://svn.apache.org/viewvc?view=rev&rev=508233
Log:
added support for reference case
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=508233&r1=508232&r2=508233
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Thu Feb 15 15:16:38 2007
@@ -343,6 +343,7 @@
long maxFrameSize = _session.getFrameMax();
Iterable<ByteBuffer> contentItr = msg.getContents();
if (msg.getSize() > maxFrameSize)
+ //if(true)
{
Iterator<ByteBuffer> cItr = contentItr.iterator();
if (cItr.next().limit() > maxFrameSize) // First chunk should
equal incoming frame size
Modified:
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=508233&r1=508232&r2=508233
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Thu Feb 15 15:16:38 2007
@@ -328,7 +328,7 @@
}
public int getPrefetchHigh()
- {
+ {
return _prefetchHigh;
}
Modified:
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java?view=diff&rev=508233&r1=508232&r2=508233
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
Thu Feb 15 15:16:38 2007
@@ -47,6 +47,7 @@
try
{
protocolSession.messageAppendBodyReceived((MessageAppendBody)evt.getMethod());
+ System.out.println("Message.appened()-->Appending
message content to body");
}
catch (Exception e)
{
Modified:
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java?view=diff&rev=508233&r1=508232&r2=508233
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
Thu Feb 15 15:16:38 2007
@@ -46,6 +46,8 @@
{
MessageCloseBody body = (MessageCloseBody)evt.getMethod();
String referenceId = new String(body.getReference());
+ System.out.println("Message.closing()-->Handing message to
session");
+
protocolSession.deliverMessageToAMQSession(evt.getChannelId(),
referenceId);
_logger.debug("Method Close Body received, notify session to
accept unprocessed message");
}
Modified:
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java?view=diff&rev=508233&r1=508232&r2=508233
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java
Thu Feb 15 15:16:38 2007
@@ -21,6 +21,7 @@
package org.apache.qpid.client.handler;
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
@@ -44,7 +45,11 @@
public void methodReceived (AMQStateManager stateManager,
AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
- // TODO
+ final UnprocessedMessage msg = new
UnprocessedMessage(evt.getChannelId(), evt.getRequestId(), null, false);
+ String referenceId = new
String(((MessageOpenBody)evt.getMethod()).getReference());
+ protocolSession.unprocessedMessageReceived(referenceId, msg);
+
+ System.out.println("Message.open()-->Adding message to map with ref");
}
}
Modified:
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java?view=diff&rev=508233&r1=508232&r2=508233
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
Thu Feb 15 15:16:38 2007
@@ -78,7 +78,7 @@
else
{
String referenceId = new
String(transferBody.getBody().getContentAsByteArray());
- protocolSession.deliverMessageToAMQSession(evt.getChannelId(),
referenceId);
+
protocolSession.messageTransferBodyReceivedForReferenceCase(referenceId,
messageHeaders,transferBody.getRedelivered());
}
}
Modified:
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java?view=diff&rev=508233&r1=508232&r2=508233
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
Thu Feb 15 15:16:38 2007
@@ -56,7 +56,7 @@
this.redeliveredFlag = redeliveredFlag;
addContent(content);
}
-
+
public void addContent(byte[] content)
{
contents.add(content);
@@ -97,4 +97,12 @@
{
return "UnprocessedMessage: ch=" + channelId + "; bytesReceived=" +
bytesReceived + "; deliveryTag=" + deliveryTag + "; MsgHdrs=" + messageHeaders
+ "Num contents=" + contents.size() + "; First content=" + new
String(contents.get(0));
}
+
+ public void setMessageHeaders(MessageHeaders messageHeaders) {
+ this.messageHeaders = messageHeaders;
+ }
+
+ public void setRedeliveredFlag(boolean redeliveredFlag) {
+ this.redeliveredFlag = redeliveredFlag;
+ }
}
Modified:
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=508233&r1=508232&r2=508233
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
Thu Feb 15 15:16:38 2007
@@ -36,7 +36,9 @@
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.ConnectionTuneParameters;
+import org.apache.qpid.client.message.MessageHeaders;
import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQRequestBody;
@@ -52,9 +54,7 @@
import org.apache.qpid.framing.VersionSpecificRegistry;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
-import org.apache.qpid.protocol.AMQProtocolWriter;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
/**
* Wrapper for protocol session that provides type-safe access to session
attributes.
@@ -280,6 +280,12 @@
String referenceId = new String(appendBody.getReference());
UnprocessedMessage msg =
(UnprocessedMessage)_referenceId2UnprocessedMsgMap.get(referenceId);
msg.addContent(appendBody.bytes);
+ }
+
+ public void messageTransferBodyReceivedForReferenceCase(String
referenceId,MessageHeaders messageHeaders,boolean redilivered){
+ UnprocessedMessage msg =
(UnprocessedMessage)_referenceId2UnprocessedMsgMap.get(referenceId);
+ msg.setMessageHeaders(messageHeaders);
+ msg.setRedeliveredFlag(redilivered);
}
public void messageRequestBodyReceived(int channelId, AMQRequestBody
requestBody) throws Exception
Modified:
incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java?view=diff&rev=508233&r1=508232&r2=508233
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java
Thu Feb 15 15:16:38 2007
@@ -68,8 +68,21 @@
public static void main(String[] args){
PubSubTwoConnectionTest test = new PubSubTwoConnectionTest();
try {
- test.setUp();
- test.testTwoConnections();
+ //test.setUp();
+ //test.testTwoConnections();
+ int a = 5;
+
+ System.out.println(a++);
+ System.out.println(a);
+ System.out.println(++a);
+
+ int b = ++a;
+ int c = a++;
+
+ System.out.println(b);
+ System.out.println(c);
+ System.out.println(a);
+
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();