Author: kpvdr
Date: Thu Feb 15 11:30:50 2007
New Revision: 508098
URL: http://svn.apache.org/viewvc?view=rev&rev=508098
Log:
Adjusted mechanism for sending refs. (still in progress)
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=508098&r1=508097&r2=508098
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Thu Feb 15 11:30:50 2007
@@ -409,37 +409,43 @@
public void deliverRef(final AMQMessage msg, final AMQShortString
destination, final long deliveryTag)
{
final byte[] refId =
String.valueOf(System.currentTimeMillis()).getBytes();
- AMQMethodBody openBody = MessageOpenBody.createMethodBody(
- _session.getProtocolMajorVersion(), // AMQP major version
- _session.getProtocolMinorVersion(), // AMQP minor version
- refId);
- _session.writeRequest(_channelId, openBody, new AMQMethodListener()
- {
- public boolean methodReceived(AMQMethodEvent evt) throws
AMQException
- {
- AMQMethodBody method = evt.getMethod();
- if (_log.isDebugEnabled())
- {
- _log.debug(method + " received on channel " + _channelId);
- }
- if (method instanceof MessageOkBody)
- {
- acknowledgeMessage(deliveryTag, false);
- deliverRef(refId, msg, destination,
_session.getStateManager());
- return true;
- }
- else
- {
- // TODO: implement reject
- return false;
- }
- }
- public void error(Exception e) {}
- });
+ deliverRef(refId, msg, destination, _session.getStateManager());
+// AMQMethodBody openBody = MessageOpenBody.createMethodBody(
+// _session.getProtocolMajorVersion(), // AMQP major version
+// _session.getProtocolMinorVersion(), // AMQP minor version
+// refId);
+// _session.writeRequest(_channelId, openBody, new AMQMethodListener()
+// {
+// public boolean methodReceived(AMQMethodEvent evt) throws
AMQException
+// {
+// AMQMethodBody method = evt.getMethod();
+// if (_log.isDebugEnabled())
+// {
+// _log.debug(method + " received on channel " +
_channelId);
+// }
+// if (method instanceof MessageOkBody)
+// {
+// acknowledgeMessage(deliveryTag, false);
+// deliverRef(refId, msg, destination,
_session.getStateManager());
+// return true;
+// }
+// else
+// {
+// // TODO: implement reject
+// return false;
+// }
+// }
+// public void error(Exception e) {}
+// });
}
public void deliverRef(byte[] refId, AMQMessage msg, AMQShortString
destination, AMQMethodListener listener)
{
+ AMQMethodBody openBody = MessageOpenBody.createMethodBody(
+ _session.getProtocolMajorVersion(), // AMQP major version
+ _session.getProtocolMinorVersion(), // AMQP minor version
+ refId);
+ _session.writeRequest(_channelId, openBody, listener);
MessageTransferBody mtb = msg.getTransferBody().copy();
mtb.destination = destination;
mtb.redelivered = msg.isRedelivered();