Author: kpvdr
Date: Thu Feb 15 07:23:55 2007
New Revision: 507960
URL: http://svn.apache.org/viewvc?view=rev&rev=507960
Log:
Fixes to get TransactedTest back, there are still unresolved issues with
rollback(), however.
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=507960&r1=507959&r2=507960
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Thu Feb 15 07:23:55 2007
@@ -639,50 +639,47 @@
/**
* Called to resend all outstanding unacknowledged messages to this same
channel.
*/
- public void resend(final AMQProtocolSession session, final boolean
requeue) throws AMQException
- {
- throw new Error("XXX");
-// final List<UnacknowledgedMessage> msgToRequeue = new
LinkedList<UnacknowledgedMessage>();
-//
-// _unacknowledgedMessageMap.visit(new
UnacknowledgedMessageMap.Visitor()
-// {
-// public boolean callback(UnacknowledgedMessage message) throws
AMQException
-// {
-// long deliveryTag = message.deliveryTag;
-// AMQShortString consumerTag = message.consumerTag;
-// AMQMessage msg = message.message;
-// msg.setRedelivered(true);
-// // working
-// // deliver(msg, consumerTag, deliveryTag);
-// // trunk
-// if((consumerTag != null) &&
_consumerTag2QueueMap.containsKey(consumerTag))
-// {
-// msg.writeDeliver(session, _channelId, deliveryTag,
consumerTag);
-// }
-// else
-// {
-// // Message has no consumer tag, so was "delivered" to a
GET
-// // or consumer no longer registered
-// // cannot resend, so re-queue.
-// if (message.queue != null && (consumerTag == null ||
requeue))
-// {
-// msgToRequeue.add(message);
-// }
-// }
-// // false means continue processing
-// return false;
-// }
-//
-// public void visitComplete()
-// {
-// }
-// });
-//
-// for(UnacknowledgedMessage message : msgToRequeue)
-// {
-// _txnContext.deliver(message.message, message.queue);
-// _unacknowledgedMessageMap.remove(message.deliveryTag);
-// }
+ public void resend(final AMQProtocolSession session, final boolean
requeue) throws AMQException
+ {
+ final List<UnacknowledgedMessage> msgToRequeue = new
LinkedList<UnacknowledgedMessage>();
+
+ _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+ {
+ public boolean callback(UnacknowledgedMessage message) throws
AMQException
+ {
+ long deliveryTag = message.deliveryTag;
+ AMQShortString consumerTag = message.consumerTag;
+ AMQMessage msg = message.message;
+ msg.setRedelivered(true);
+ if((consumerTag != null) &&
_consumerTag2QueueMap.containsKey(consumerTag))
+ {
+ deliver(msg, consumerTag, deliveryTag);
+ //msg.writeDeliver(session, _channelId, deliveryTag,
consumerTag);
+ }
+ else
+ {
+ // Message has no consumer tag, so was "delivered" to a GET
+ // or consumer no longer registered
+ // cannot resend, so re-queue.
+ if (message.queue != null && (consumerTag == null ||
requeue))
+ {
+ msgToRequeue.add(message);
+ }
+ }
+ // false means continue processing
+ return false;
+ }
+
+ public void visitComplete()
+ {
+ }
+ });
+
+ for(UnacknowledgedMessage message : msgToRequeue)
+ {
+ _txnContext.deliver(message.message, message.queue);
+ _unacknowledgedMessageMap.remove(message.deliveryTag);
+ }
}
/**
Modified:
incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java?view=diff&rev=507960&r1=507959&r2=507960
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
Thu Feb 15 07:23:55 2007
@@ -59,7 +59,7 @@
super.setUp();
TransportConnection.createVMBroker(1);
queue1 = new AMQQueue(new AMQShortString("Q1"), new
AMQShortString("Q1"), false, true);
- queue2 = new AMQQueue("Q2", false);
+ queue2 = new AMQQueue("Q2x", false);
con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest",
"test");
session = con.createSession(true, 0);
@@ -74,15 +74,6 @@
prepSession = prepCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
prepProducer1 = prepSession.createProducer(queue1);
prepCon.start();
-
-// //add some messages
-// prepProducer1.send(prepSession.createTextMessage("A"));
-// prepProducer1.send(prepSession.createTextMessage("B"));
-// prepProducer1.send(prepSession.createTextMessage("C"));
-//
-// testCon = new AMQConnection("vm://:1", "guest", "guest",
"TestConnection", "/test");
-// testSession = testCon.createSession(false,
AMQSession.NO_ACKNOWLEDGE);
-// testConsumer2 = testSession.createConsumer(queue2);
}
protected void tearDown() throws Exception
@@ -110,8 +101,9 @@
//commit
session.commit();
- testCon.start();
+
//ensure sent messages can be received and received messages are gone
+
testCon = new AMQConnection("vm://:1", "guest", "guest",
"TestConnection", "test");
testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
testConsumer1 = testSession.createConsumer(queue1);
@@ -156,10 +148,10 @@
expect("A", consumer1.receive(1000));
expect("B", consumer1.receive(1000));
expect("C", consumer1.receive(1000));
- testCon.start();
- testConsumer1 = testSession.createConsumer(queue1);
+
//commit
- session.commit();
+ //session.commit();
+
testCon = new AMQConnection("vm://:1", "guest", "guest",
"TestConnection", "test");
testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
@@ -175,6 +167,7 @@
// messages left over from the last test (which can affect later tests)...
public void testEmpty2() throws Exception
{
+//System.out.println("=== DEBUG === testEmpty2(): assertTrue(null ==
consumer1.receive(1000));");
assertTrue(null == consumer1.receive(1000));
}
Modified:
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java?view=diff&rev=507960&r1=507959&r2=507960
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
Thu Feb 15 07:23:55 2007
@@ -832,10 +832,14 @@
if(_encodedForm != null)
{
+ // FIXME: This is a quick fix for a problem where the ByteBuffer
_encodedForm
+ // becomes consumed if debug messages are printed which involve a
FieldTable,
+ // and for some tests. This is a rather ugly quick-fix...
if (_encodedForm.remaining() == 0)
{
_encodedForm.rewind();
}
+// _encodedForm.flip();
buffer.put(_encodedForm);
}
else if(_properties != null)