Author: kpvdr
Date: Thu Feb 15 07:23:55 2007
New Revision: 507960

URL: http://svn.apache.org/viewvc?view=rev&rev=507960
Log:
Fixes to get TransactedTest back, there are still unresolved issues with 
rollback(), however.

Modified:
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    
incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
    
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=507960&r1=507959&r2=507960
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 Thu Feb 15 07:23:55 2007
@@ -639,50 +639,47 @@
     /**
      * Called to resend all outstanding unacknowledged messages to this same 
channel.
      */
-     public void resend(final AMQProtocolSession session, final boolean 
requeue) throws AMQException
-     {
-        throw new Error("XXX");
-//         final List<UnacknowledgedMessage> msgToRequeue = new 
LinkedList<UnacknowledgedMessage>();
-// 
-//         _unacknowledgedMessageMap.visit(new 
UnacknowledgedMessageMap.Visitor()
-//         {
-//             public boolean callback(UnacknowledgedMessage message) throws 
AMQException
-//             {
-//                 long deliveryTag = message.deliveryTag;
-//                 AMQShortString consumerTag = message.consumerTag;
-//                 AMQMessage msg = message.message;
-//                 msg.setRedelivered(true);
-//                 // working
-// //                deliver(msg, consumerTag, deliveryTag);
-//                 // trunk
-//                 if((consumerTag != null) && 
_consumerTag2QueueMap.containsKey(consumerTag))
-//                 {
-//                     msg.writeDeliver(session, _channelId, deliveryTag, 
consumerTag);
-//                 }
-//                 else
-//                 {
-//                     // Message has no consumer tag, so was "delivered" to a 
GET
-//                     // or consumer no longer registered
-//                     // cannot resend, so re-queue.
-//                     if (message.queue != null && (consumerTag == null || 
requeue))
-//                     {
-//                         msgToRequeue.add(message);                         
-//                     }
-//                 }
-//                 // false means continue processing
-//                 return false;
-//             }
-// 
-//             public void visitComplete()
-//             {
-//             }
-//         });
-// 
-//         for(UnacknowledgedMessage message : msgToRequeue)
-//         {
-//             _txnContext.deliver(message.message, message.queue);
-//             _unacknowledgedMessageMap.remove(message.deliveryTag);
-//         }
+    public void resend(final AMQProtocolSession session, final boolean 
requeue) throws AMQException
+    {
+        final List<UnacknowledgedMessage> msgToRequeue = new 
LinkedList<UnacknowledgedMessage>();
+
+        _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+        {
+           public boolean callback(UnacknowledgedMessage message) throws 
AMQException
+            {
+                long deliveryTag = message.deliveryTag;
+                AMQShortString consumerTag = message.consumerTag;
+                AMQMessage msg = message.message;
+                msg.setRedelivered(true);
+                if((consumerTag != null) && 
_consumerTag2QueueMap.containsKey(consumerTag))
+                {
+                    deliver(msg, consumerTag, deliveryTag);
+                    //msg.writeDeliver(session, _channelId, deliveryTag, 
consumerTag);
+                }
+                else
+                {
+                    // Message has no consumer tag, so was "delivered" to a GET
+                    // or consumer no longer registered
+                    // cannot resend, so re-queue.
+                    if (message.queue != null && (consumerTag == null || 
requeue))
+                    {
+                        msgToRequeue.add(message);                         
+                    }
+                }
+                // false means continue processing
+                return false;
+            }
+
+            public void visitComplete()
+            {
+            }
+        });
+
+        for(UnacknowledgedMessage message : msgToRequeue)
+        {
+            _txnContext.deliver(message.message, message.queue);
+            _unacknowledgedMessageMap.remove(message.deliveryTag);
+        }
      }
 
     /**

Modified: 
incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java?view=diff&rev=507960&r1=507959&r2=507960
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
 Thu Feb 15 07:23:55 2007
@@ -59,7 +59,7 @@
         super.setUp();
         TransportConnection.createVMBroker(1);
         queue1 = new AMQQueue(new AMQShortString("Q1"), new 
AMQShortString("Q1"), false, true);
-        queue2 = new AMQQueue("Q2", false);
+        queue2 = new AMQQueue("Q2x", false);
 
         con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", 
"test");
         session = con.createSession(true, 0);
@@ -74,15 +74,6 @@
         prepSession = prepCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
         prepProducer1 = prepSession.createProducer(queue1);
         prepCon.start();
-
-//         //add some messages
-//         prepProducer1.send(prepSession.createTextMessage("A"));
-//         prepProducer1.send(prepSession.createTextMessage("B"));
-//         prepProducer1.send(prepSession.createTextMessage("C"));
-// 
-//         testCon = new AMQConnection("vm://:1", "guest", "guest", 
"TestConnection", "/test");
-//         testSession = testCon.createSession(false, 
AMQSession.NO_ACKNOWLEDGE);
-//         testConsumer2 = testSession.createConsumer(queue2);
     }
 
     protected void tearDown() throws Exception
@@ -110,8 +101,9 @@
 
         //commit
         session.commit();
-        testCon.start();
+
         //ensure sent messages can be received and received messages are gone
+
         testCon = new AMQConnection("vm://:1", "guest", "guest", 
"TestConnection", "test");
         testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
         testConsumer1 = testSession.createConsumer(queue1);
@@ -156,10 +148,10 @@
         expect("A", consumer1.receive(1000));
         expect("B", consumer1.receive(1000));
         expect("C", consumer1.receive(1000));
-        testCon.start();
-        testConsumer1 = testSession.createConsumer(queue1);
+
         //commit
-        session.commit();
+        //session.commit();
+
 
         testCon = new AMQConnection("vm://:1", "guest", "guest", 
"TestConnection", "test");
         testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
@@ -175,6 +167,7 @@
     // messages left over from the last test (which can affect later tests)...
     public void testEmpty2() throws Exception
     {
+//System.out.println("=== DEBUG === testEmpty2(): assertTrue(null == 
consumer1.receive(1000));");
         assertTrue(null == consumer1.receive(1000));
     }
 

Modified: 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java?view=diff&rev=507960&r1=507959&r2=507960
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
 Thu Feb 15 07:23:55 2007
@@ -832,10 +832,14 @@
 
         if(_encodedForm != null)
         {
+            // FIXME: This is a quick fix for a problem where the ByteBuffer 
_encodedForm
+            // becomes consumed if debug messages are printed which involve a 
FieldTable,
+            // and for some tests. This is a rather ugly quick-fix...
             if (_encodedForm.remaining() == 0)
             {
                 _encodedForm.rewind();
             }
+//            _encodedForm.flip();
             buffer.put(_encodedForm);
         }
         else if(_properties != null)


Reply via email to