Author: kpvdr
Date: Fri Feb 16 08:02:26 2007
New Revision: 508460
URL: http://svn.apache.org/viewvc?view=rev&rev=508460
Log:
Additions to allow refs to be sent from broker to client. Also some tidy-up.
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=508460&r1=508459&r2=508460
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Fri Feb 16 08:02:26 2007
@@ -309,33 +309,6 @@
_returnMessages.add(e);
}
}
-//
-// public void deliver(AMQMessage msg, AMQShortString destination, final
long deliveryTag)
-// {
-// deliver(msg, destination, new AMQMethodListener()
-// {
-// public boolean methodReceived(AMQMethodEvent evt) throws
AMQException
-// {
-// AMQMethodBody method = evt.getMethod();
-// if (_log.isDebugEnabled())
-// {
-// _log.debug(method + " received on channel " +
_channelId);
-// }
-// // XXX: multiple?
-// if (method instanceof MessageOkBody)
-// {
-// acknowledgeMessage(deliveryTag, false);
-// return true;
-// }
-// else
-// {
-// // TODO: implement reject
-// return false;
-// }
-// }
-// public void error(Exception e) {}
-// });
-// }
public void deliver(AMQMessage msg, AMQShortString destination, final long
deliveryTag)
{
@@ -343,7 +316,6 @@
long maxFrameSize = _session.getFrameMax();
Iterable<ByteBuffer> contentItr = msg.getContents();
if (msg.getSize() > maxFrameSize)
- //if(true)
{
Iterator<ByteBuffer> cItr = contentItr.iterator();
if (cItr.next().limit() > maxFrameSize) // First chunk should
equal incoming frame size
@@ -410,34 +382,29 @@
public void deliverRef(final AMQMessage msg, final AMQShortString
destination, final long deliveryTag)
{
final byte[] refId =
String.valueOf(System.currentTimeMillis()).getBytes();
- deliverRef(refId, msg, destination, _session.getStateManager());
-// AMQMethodBody openBody = MessageOpenBody.createMethodBody(
-// _session.getProtocolMajorVersion(), // AMQP major version
-// _session.getProtocolMinorVersion(), // AMQP minor version
-// refId);
-// _session.writeRequest(_channelId, openBody, new AMQMethodListener()
-// {
-// public boolean methodReceived(AMQMethodEvent evt) throws
AMQException
-// {
-// AMQMethodBody method = evt.getMethod();
-// if (_log.isDebugEnabled())
-// {
-// _log.debug(method + " received on channel " +
_channelId);
-// }
-// if (method instanceof MessageOkBody)
-// {
-// acknowledgeMessage(deliveryTag, false);
-// deliverRef(refId, msg, destination,
_session.getStateManager());
-// return true;
-// }
-// else
-// {
-// // TODO: implement reject
-// return false;
-// }
-// }
-// public void error(Exception e) {}
-// });
+ deliverRef(refId, msg, destination, new AMQMethodListener()
+ {
+ public boolean methodReceived(AMQMethodEvent evt) throws
AMQException
+ {
+ AMQMethodBody method = evt.getMethod();
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(method + " received on channel " + _channelId);
+ }
+ // XXX: multiple?
+ if (method instanceof MessageOkBody)
+ {
+ acknowledgeMessage(deliveryTag, false);
+ return true;
+ }
+ else
+ {
+ // TODO: implement reject
+ return false;
+ }
+ }
+ public void error(Exception e) {}
+ });
}
public void deliverRef(byte[] refId, AMQMessage msg, AMQShortString
destination, AMQMethodListener listener)
@@ -470,58 +437,6 @@
refId);
_session.writeRequest(_channelId, closeBody, listener);
}
-
-// protected void route(AMQMessage msg) throws AMQException
-// {
-// if (isTransactional())
-// {
-// //don't create a transaction unless needed
-// if (msg.isPersistent())
-// {
-// // _txnBuffer.containsPersistentChanges();
-// }
-//
-// //A publication will result in the enlisting of several
-// //TxnOps. The first is an op that will store the message.
-// //Following that (and ordering is important), an op will
-// //be added for every queue onto which the message is
-// //enqueued. Finally a cleanup op will be added to decrement
-// //the reference associated with the routing.
-// // Store storeOp = new Store(msg);
-// // _txnBuffer.enlist(storeOp);
-// // msg.setTxnBuffer(_txnBuffer);
-// try
-// {
-// _exchanges.routeContent(msg);
-// // _txnBuffer.enlist(new Cleanup(msg));
-// }
-// catch (RequiredDeliveryException e)
-// {
-// //Can only be due to the mandatory flag, as no attempt
-// //has yet been made to deliver the message. The
-// //message will thus not have been delivered to any
-// //queue so we can return the message (without killing
-// //the transaction) and for efficiency remove the store
-// //operation from the buffer.
-// // _txnBuffer.cancel(storeOp);
-// throw e;
-// }
-// }
-// else
-// {
-// try
-// {
-// _exchanges.routeContent(msg);
-// //following check implements the functionality
-// //required by the 'immediate' flag:
-// msg.checkDeliveredToConsumer();
-// }
-// finally
-// {
-// msg.decrementReference(_storeContext);
-// }
-// }
-// }
public RequestManager getRequestManager()
{
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java?view=diff&rev=508460&r1=508459&r2=508460
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
Fri Feb 16 08:02:26 2007
@@ -46,6 +46,12 @@
{
AMQProtocolSession session = stateManager.getProtocolSession();
_logger.info("Received Connection-close-ok");
+ // We wait for the Mina library to close the connection, which will
happen when
+ // the client closes the Mina connection, causing
AMQFastProtocolHand.sessionClosed()
+ // to be called.
+ // TODO - Find a better way of doing this without holding up this
thread...
+ try { Thread.currentThread().sleep(2000); } // 2 seconds
+ catch (InterruptedException e) {}
session.closeSession();
}
}
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=diff&rev=508460&r1=508459&r2=508460
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
Fri Feb 16 08:02:26 2007
@@ -29,6 +29,7 @@
import org.apache.qpid.framing.QueueDeclareOkBody;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -120,7 +121,11 @@
_log.info("Queue " + body.queue + " exists and is accesible to
this connection [owner=" + queue.getOwner() +"]");
}
//set this as the default queue on the channel:
- session.getChannel(evt.getChannelId()).setDefaultQueue(queue);
+ AMQChannel channel = session.getChannel(evt.getChannelId());
+ if (channel == null)
+ throw new AMQException("Attempt to write to non-existent
channel " + evt.getChannelId() + ": " + body);
+ channel.setDefaultQueue(queue);
+ //session.getChannel(evt.getChannelId()).setDefaultQueue(queue);
}
if (!body.nowait)
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=508460&r1=508459&r2=508460
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
Fri Feb 16 08:02:26 2007
@@ -375,6 +375,10 @@
{
checkMethodBodyVersion(methodBody);
AMQChannel channel = getChannel(channelNum);
+ if (channel == null)
+ {
+ throw new RuntimeException("Attempt to write to non-existent
channel " + channelNum + " method=" +methodBody);
+ }
RequestManager requestManager = channel.getRequestManager();
return requestManager.sendRequest(methodBody, methodListener);
}
@@ -389,10 +393,17 @@
{
checkMethodBodyVersion(methodBody);
AMQChannel channel = getChannel(channelNum);
+ if (channel == null)
+ {
+ throw new RuntimeException("Attempt to write to non-existent
channel " + channelNum + ": reqId=" + requestId + " method=" + methodBody);
+ }
ResponseManager responseManager = channel.getResponseManager();
- try {
+ try
+ {
responseManager.sendResponse(requestId, methodBody);
- } catch (RequestResponseMappingException e) {
+ }
+ catch (RequestResponseMappingException e)
+ {
throw new RuntimeException(e);
}
}
@@ -599,7 +610,8 @@
task.doTask(this);
}
}
- _minaProtocolSession.close();
+// gsim-python
+// _minaProtocolSession.close();
}
/**
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?view=diff&rev=508460&r1=508459&r2=508460
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
Fri Feb 16 08:02:26 2007
@@ -163,7 +163,9 @@
else
{
_logger.error("Exception caught in" + session + ", closing session
explictly: " + throwable, throwable);
+
// TODO: Closing with code 200 ("reply-sucess") ??? This cannot be
right!
+ // gsim-python
//session.closeSessionRequest(200, new
AMQShortString(throwable.getMessage()));
session.closeSession();
}
Modified:
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java?view=diff&rev=508460&r1=508459&r2=508460
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
Fri Feb 16 08:02:26 2007
@@ -24,7 +24,9 @@
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.MessageAppendBody;
+import org.apache.qpid.framing.MessageOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.log4j.Logger;
@@ -47,7 +49,12 @@
try
{
protocolSession.messageAppendBodyReceived((MessageAppendBody)evt.getMethod());
- System.out.println("Message.appened()-->Appending
message content to body");
+
+ // Be aware of possible changes to parameter order as versions
change.
+ final AMQMethodBody methodBody = MessageOkBody.createMethodBody(
+ protocolSession.getProtocolMajorVersion(), // AMQP major
version
+ protocolSession.getProtocolMinorVersion()); // AMQP minor
version
+ protocolSession.writeResponse(evt.getChannelId(),
evt.getRequestId(), methodBody);
}
catch (Exception e)
{
Modified:
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java?view=diff&rev=508460&r1=508459&r2=508460
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
Fri Feb 16 08:02:26 2007
@@ -24,7 +24,9 @@
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.framing.MessageOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.log4j.Logger;
@@ -46,10 +48,15 @@
{
MessageCloseBody body = (MessageCloseBody)evt.getMethod();
String referenceId = new String(body.getReference());
- System.out.println("Message.closing()-->Handing message to
session");
protocolSession.deliverMessageToAMQSession(evt.getChannelId(),
referenceId);
_logger.debug("Method Close Body received, notify session to
accept unprocessed message");
+
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQMethodBody methodBody = MessageOkBody.createMethodBody(
+ protocolSession.getProtocolMajorVersion(), // AMQP major version
+ protocolSession.getProtocolMinorVersion()); // AMQP minor version
+ protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(),
methodBody);
}
}
Modified:
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java?view=diff&rev=508460&r1=508459&r2=508460
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java
Fri Feb 16 08:02:26 2007
@@ -25,7 +25,9 @@
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.MessageOpenBody;
+import org.apache.qpid.framing.MessageOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
//import org.apache.log4j.Logger;
@@ -45,11 +47,15 @@
public void methodReceived (AMQStateManager stateManager,
AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
- final UnprocessedMessage msg = new
UnprocessedMessage(evt.getChannelId(), evt.getRequestId(), null, false);
- String referenceId = new
String(((MessageOpenBody)evt.getMethod()).getReference());
- protocolSession.unprocessedMessageReceived(referenceId, msg);
-
- System.out.println("Message.open()-->Adding message to map with ref");
+ byte[] referenceId = ((MessageOpenBody)evt.getMethod()).getReference();
+ final UnprocessedMessage msg = new
UnprocessedMessage(evt.getChannelId(), evt.getRequestId(), referenceId);
+ protocolSession.unprocessedMessageReceived(new String(referenceId),
msg);
+
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQMethodBody methodBody = MessageOkBody.createMethodBody(
+ protocolSession.getProtocolMajorVersion(), // AMQP major version
+ protocolSession.getProtocolMinorVersion()); // AMQP minor version
+ protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(),
methodBody);
}
}
Modified:
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java?view=diff&rev=508460&r1=508459&r2=508460
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
Fri Feb 16 08:02:26 2007
@@ -35,11 +35,19 @@
{
private int bytesReceived = 0;
private int channelId;
+ private byte[] referenceId;
private List<byte[]> contents = new LinkedList();
private long deliveryTag;
private boolean redeliveredFlag;
private MessageHeaders messageHeaders;
+ public UnprocessedMessage(int channelId, long deliveryTag, byte[]
referenceId)
+ {
+ this.channelId = channelId;
+ this.deliveryTag = deliveryTag;
+ this.referenceId = referenceId;
+ }
+
public UnprocessedMessage(int channelId, long deliveryTag, MessageHeaders
messageHeaders, boolean redeliveredFlag)
{
this.channelId = channelId;
@@ -73,6 +81,11 @@
return channelId;
}
+ public byte[] getReferenceId()
+ {
+ return referenceId;
+ }
+
public List<byte[]> getContents()
{
return contents;
@@ -95,7 +108,9 @@
public String toString()
{
- return "UnprocessedMessage: ch=" + channelId + "; bytesReceived=" +
bytesReceived + "; deliveryTag=" + deliveryTag + "; MsgHdrs=" + messageHeaders
+ "Num contents=" + contents.size() + "; First content=" + new
String(contents.get(0));
+ return "UnprocessedMessage: ch=" + channelId + "; bytesReceived=" +
bytesReceived + "; deliveryTag=" +
+ deliveryTag + "; MsgHdrs=" + messageHeaders + "Num contents=" +
contents.size() + "; First content=" +
+ new String(contents.get(0));
}
public void setMessageHeaders(MessageHeaders messageHeaders) {
Modified:
incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java?view=diff&rev=508460&r1=508459&r2=508460
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java
Fri Feb 16 08:02:26 2007
@@ -65,27 +65,19 @@
assertEquals("Hello", tm1.getText());
}
- public static void main(String[] args){
- PubSubTwoConnectionTest test = new PubSubTwoConnectionTest();
- try {
- //test.setUp();
- //test.testTwoConnections();
- int a = 5;
-
- System.out.println(a++);
- System.out.println(a);
- System.out.println(++a);
-
- int b = ++a;
- int c = a++;
-
- System.out.println(b);
- System.out.println(c);
- System.out.println(a);
-
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ public static void main(String[] args)
+ {
+ PubSubTwoConnectionTest test = new PubSubTwoConnectionTest();
+ try
+ {
+ test.setUp();
+ test.testTwoConnections();
+ test.tearDown();
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
}
}