Author: rgreig
Date: Tue Dec 12 09:51:55 2006
New Revision: 486255

URL: http://svn.apache.org/viewvc?view=rev&rev=486255
Log:
QPID-174 Fix submitted by Rob Godfrey. Now performs a flip() to ensure the 
limit is set correctly.

Added:
    
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/
    
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
   (with props)
Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
    
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=486255&r1=486254&r2=486255
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
 Tue Dec 12 09:51:55 2006
@@ -505,7 +505,14 @@
         // position beyond the start
         if (_data != null)
         {
-            _data.rewind();
+            if (!_readableMessage)
+            {
+                _data.flip();
+            }
+            else
+            {
+                _data.rewind();
+            }
         }
         return _data;
     }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java?view=diff&rev=486255&r1=486254&r2=486255
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
 Tue Dec 12 09:51:55 2006
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,18 +20,17 @@
  */
 package org.apache.qpid.client.message;
 
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.AMQException;
 import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
 
-import javax.jms.ObjectMessage;
 import javax.jms.JMSException;
 import javax.jms.MessageFormatException;
-import javax.jms.MessageNotWriteableException;
+import javax.jms.ObjectMessage;
 import java.io.*;
-import java.nio.charset.Charset;
 import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
 
 public class JMSObjectMessage extends AbstractJMSMessage implements 
ObjectMessage
 {
@@ -94,13 +93,16 @@
             _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
             _data.setAutoExpand(true);
         }
+        else
+        {
+            _data.rewind();
+        }
         try
         {
             ObjectOutputStream out = new 
ObjectOutputStream(_data.asOutputStream());
             out.writeObject(serializable);
             out.flush();
             out.close();
-            _data.rewind();
         }
         catch (IOException e)
         {

Modified: 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java?view=diff&rev=486255&r1=486254&r2=486255
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java
 Tue Dec 12 09:51:55 2006
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,28 +19,21 @@
  */
 package org.apache.qpid.test.unit.basic;
 
+import junit.framework.Assert;
+import junit.framework.TestCase;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
-import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.client.message.JMSObjectMessage;
-import org.apache.qpid.testutil.VMBrokerSetup;
+import org.apache.qpid.client.transport.TransportConnection;
 
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.MessageNotWriteableException;
+import javax.jms.*;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
-import junit.framework.TestCase;
-import junit.framework.Assert;
-
 public class ObjectMessageTest extends TestCase implements MessageListener
 {
     private AMQConnection _connection;
@@ -54,6 +47,7 @@
     protected void setUp() throws Exception
     {
         super.setUp();
+        TransportConnection.createVMBroker(1);
         try
         {
             init(new AMQConnection(_connectionString, "guest", "guest", 
randomize("Client"), "/test_path"));
@@ -67,6 +61,7 @@
     protected void tearDown() throws Exception
     {
         super.tearDown();
+        TransportConnection.killAllVMBrokers();
     }
 
     private void init(AMQConnection connection) throws Exception
@@ -263,6 +258,6 @@
 
     public static junit.framework.Test suite()
     {
-        return new VMBrokerSetup(new 
junit.framework.TestSuite(ObjectMessageTest.class));
+        return new junit.framework.TestSuite(ObjectMessageTest.class);
     }
 }

Added: 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java?view=auto&rev=486255
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
 Tue Dec 12 09:51:55 2006
@@ -0,0 +1,104 @@
+/**
+ * User: Robert Greig
+ * Date: 12-Dec-2006
+ ******************************************************************************
+ * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of
+ * this program may be photocopied reproduced or translated to another
+ * program language without prior written consent of JP Morgan Chase Ltd
+ 
******************************************************************************/
+package org.apache.qpid.test.unit.message;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQHeadersExchange;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.PropertyFieldTable;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.BindingURL;
+
+import javax.jms.*;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class StreamMessageTest extends TestCase
+{
+
+    private static final Logger _logger = 
Logger.getLogger(StreamMessageTest.class);
+
+    public String _connectionString = "vm://:1";
+
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        TransportConnection.createVMBroker(1);
+    }
+
+
+    protected void tearDown() throws Exception
+    {
+        super.tearDown();
+        TransportConnection.killAllVMBrokers();
+    }
+
+    public void testStreamMessageEOF() throws Exception
+    {
+        Connection con = new AMQConnection("vm://:1", "guest", "guest", 
"consumer1", "/test");
+        AMQSession consumerSession = (AMQSession) con.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+
+
+        AMQHeadersExchange queue = new AMQHeadersExchange(new 
AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS+"://"+ExchangeDefaults.HEADERS_EXCHANGE_NAME+"/test/queue1?"+
 BindingURL.OPTION_ROUTING_KEY+"='F0000=1'"));
+        FieldTable ft = new PropertyFieldTable();
+        ft.setString("F1000","1");
+        MessageConsumer consumer = consumerSession.createConsumer(queue, 
AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, 
false, false, (String)null, ft);
+
+
+        //force synch to ensure the consumer has resulted in a bound queue
+        ((AMQSession) 
consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, 
ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+
+        Connection con2 = new AMQConnection("vm://:1", "guest", "guest", 
"producer1", "/test");
+
+        AMQSession producerSession = (AMQSession) con2.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+
+        // Need to start the "producer" connection in order to receive bounced 
messages
+        _logger.info("Starting producer connection");
+        con2.start();
+
+
+        MessageProducer mandatoryProducer = 
producerSession.createProducer(queue);
+
+        // Third test - should be routed
+        _logger.info("Sending routable message");
+        StreamMessage msg =  producerSession.createStreamMessage();
+
+        msg.setStringProperty("F1000","1");
+
+        msg.writeByte((byte)42);
+
+        mandatoryProducer.send(msg);
+
+
+
+        _logger.info("Starting consumer connection");
+        con.start();
+
+        StreamMessage msg2 = (StreamMessage) consumer.receive();
+
+        byte b1 = msg2.readByte();
+        try
+        {
+            byte b2 = msg2.readByte();
+        }
+        catch (Exception e)
+        {
+            assertTrue("Expected MessageEOFException: " + e, e instanceof 
MessageEOFException);
+        }
+
+
+
+    }
+}

Propchange: 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to