Author: rgreig
Date: Wed Dec 13 03:01:09 2006
New Revision: 486594
URL: http://svn.apache.org/viewvc?view=rev&rev=486594
Log:
QPID-175 Patch supplied by Rob Godfrey. Now allocates new autoexpanding buffer
for StreamMessage when clearBody is called
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java?view=diff&rev=486594&r1=486593&r2=486594
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
Wed Dec 13 03:01:09 2006
@@ -59,11 +59,16 @@
if (_data == null)
{
- _data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE);
- _data.setAutoExpand(true);
+ allocateInitialBuffer();
}
}
+ private void allocateInitialBuffer()
+ {
+ _data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE);
+ _data.setAutoExpand(true);
+ }
+
AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader,
ByteBuffer data)
throws AMQException
{
@@ -74,7 +79,7 @@
public void clearBodyImpl() throws JMSException
{
- _data.clear();
+ allocateInitialBuffer();
}
public String toBodyString() throws JMSException
Modified:
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java?view=diff&rev=486594&r1=486593&r2=486594
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
Wed Dec 13 03:01:09 2006
@@ -13,6 +13,7 @@
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQHeadersExchange;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.FieldTable;
@@ -97,8 +98,39 @@
{
assertTrue("Expected MessageEOFException: " + e, e instanceof
MessageEOFException);
}
+ }
+ public void testModifyReceivedMessageExpandsBuffer() throws Exception
+ {
+ Connection con = new AMQConnection("vm://:1", "guest", "guest",
"consumer1", "/test");
+ AMQSession consumerSession = (AMQSession) con.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+ AMQQueue queue = new AMQQueue("testQ");
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+ consumer.setMessageListener(new MessageListener()
+ {
-
+ public void onMessage(Message message)
+ {
+ StreamMessage sm = (StreamMessage) message;
+ try
+ {
+ sm.clearBody();
+ sm.writeString("dfgjshfslfjshflsjfdlsjfhdsljkfhdsljkfhsd");
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error when writing large string to received
msg: " + e, e);
+ fail("Error when writing large string to received msg" +
e);
+ }
+ }
+ });
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest",
"producer1", "/test");
+ AMQSession producerSession = (AMQSession) con2.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer mandatoryProducer =
producerSession.createProducer(queue);
+ con.start();
+ StreamMessage sm = producerSession.createStreamMessage();
+ sm.writeInt(42);
+ mandatoryProducer.send(sm);
+ Thread.sleep(2000);
}
}