Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=496586&r1=496585&r2=496586 ============================================================================== --- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original) +++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Mon Jan 15 20:44:48 2007 @@ -24,7 +24,6 @@ import org.apache.qpid.AMQException; import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; import org.apache.qpid.configuration.Configured; -import org.apache.qpid.framing.ContentBody; import org.apache.qpid.server.configuration.Configurator; import java.util.ArrayList; @@ -97,7 +96,8 @@ private boolean addMessageToQueue(AMQMessage msg) { // Shrink the ContentBodies to their actual size to save memory. - if (compressBufferOnQueue) + if (true) throw new Error("XXX"); + /*if (compressBufferOnQueue) { Iterator it = msg.getContentBodies().iterator(); while (it.hasNext()) @@ -105,7 +105,7 @@ ContentBody cb = (ContentBody) it.next(); cb.reduceBufferToFit(); } - } + }*/ _messages.offer(msg);
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java?view=diff&rev=496586&r1=496585&r2=496586 ============================================================================== --- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java (original) +++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java Mon Jan 15 20:44:48 2007 @@ -21,9 +21,6 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.RequiredDeliveryException; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.protocol.AMQConstant; import java.util.List; @@ -35,19 +32,14 @@ */ public class NoConsumersException extends RequiredDeliveryException { - public NoConsumersException(String queue, - BasicPublishBody publishBody, - ContentHeaderBody contentHeaderBody, - List<ContentBody> contentBodies) + public NoConsumersException(String queue, AMQMessage message) { - super("Immediate delivery to " + queue + " is not possible.", publishBody, contentHeaderBody, contentBodies); + super("Immediate delivery to " + queue + " is not possible.", message); } - public NoConsumersException(BasicPublishBody publishBody, - ContentHeaderBody contentHeaderBody, - List<ContentBody> contentBodies) + public NoConsumersException(AMQMessage message) { - super("Immediate delivery is not possible.", publishBody, contentHeaderBody, contentBodies); + super("Immediate delivery is not possible.", message); } public int getReplyCode() Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=496586&r1=496585&r2=496586 ============================================================================== --- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original) +++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Mon Jan 15 20:44:48 2007 @@ -28,9 +28,8 @@ import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.BasicDeliverBody; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.BasicCancelOkBody; +import org.apache.qpid.framing.MessageOkBody; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; @@ -235,7 +234,9 @@ { channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue); } - ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName()); + ByteBuffer deliver = null; + if (true) throw new Error("XXX"); + //createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName()); AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId()); protocolSession.writeFrame(frame); @@ -268,7 +269,9 @@ channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); } - ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName()); + ByteBuffer deliver = null; + if (true) throw new Error("XXX"); + //createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName()); AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId()); protocolSession.writeFrame(frame); @@ -382,10 +385,11 @@ // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), - (byte)0, (byte)9, // AMQP version (major, minor) + if (true) throw new Error("XXX"); + /*protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), + (byte)8, (byte)0, // AMQP version (major, minor) consumerTag // consumerTag - )); + ));*/ _closed = true; } } @@ -396,12 +400,12 @@ } - private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange) + /* private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange) { // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), + AMQFrame deliverFrame = MessageTransferBody.createAMQFrame(channel.getChannelId(), (byte)0, (byte)9, // AMQP version (major, minor) consumerTag, // consumerTag deliveryTag, // deliveryTag @@ -413,5 +417,5 @@ deliverFrame.writePayload(buf); buf.flip(); return buf; - } + }*/ } Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java?view=diff&rev=496586&r1=496585&r2=496586 ============================================================================== --- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java (original) +++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java Mon Jan 15 20:44:48 2007 @@ -24,6 +24,8 @@ public class AMQRequestBody extends AMQBody { + public static final byte TYPE = 9; + // Fields declared in specification protected long requestId; protected long responseMark; @@ -54,14 +56,14 @@ protected int getSize() { - return 8 + 8 + 4 + methodPayload.getBodySize(); + return 8 + 8 + 4 + methodPayload.getSize(); } protected void writePayload(ByteBuffer buffer) { EncodingUtils.writeLong(buffer, requestId); EncodingUtils.writeLong(buffer, responseMark); - EncodingUtils.writeUnsignedShort(buffer, 0); // reserved, set to 0 + EncodingUtils.writeInteger(buffer, 0); // reserved, set to 0 methodPayload.writePayload(buffer); } @@ -70,7 +72,7 @@ { requestId = EncodingUtils.readLong(buffer); responseMark = EncodingUtils.readLong(buffer); - int reserved = EncodingUtils.readShort(buffer); // reserved, throw away + int reserved = EncodingUtils.readInteger(buffer); // reserved, throw away methodPayload.populateFromBuffer(buffer, size - 8 - 8 - 4); } Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java?view=diff&rev=496586&r1=496585&r2=496586 ============================================================================== --- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java (original) +++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java Mon Jan 15 20:44:48 2007 @@ -24,6 +24,8 @@ public class AMQResponseBody extends AMQBody { + public static final byte TYPE = 10; + // Fields declared in specification protected long responseId; protected long requestId; @@ -54,14 +56,15 @@ protected int getSize() { - return 8 + 8 + 4 + methodPayload.getBodySize(); + return 8 + 8 + 4 + methodPayload.getSize(); } protected void writePayload(ByteBuffer buffer) { EncodingUtils.writeLong(buffer, responseId); EncodingUtils.writeLong(buffer, requestId); - EncodingUtils.writeUnsignedShort(buffer, batchOffset); + // XXX + EncodingUtils.writeInteger(buffer, batchOffset); methodPayload.writePayload(buffer); } @@ -70,7 +73,8 @@ { responseId = EncodingUtils.readLong(buffer); requestId = EncodingUtils.readLong(buffer); - batchOffset = EncodingUtils.readShort(buffer); + // XXX + batchOffset = EncodingUtils.readInteger(buffer); methodPayload.populateFromBuffer(buffer, size - 8 - 8 - 4); } Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java?view=diff&rev=496586&r1=496585&r2=496586 ============================================================================== --- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java (original) +++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java Mon Jan 15 20:44:48 2007 @@ -20,9 +20,10 @@ */ package org.apache.qpid.protocol; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.RequestResponseMappingException; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; public interface AMQProtocolWriter @@ -34,8 +35,12 @@ public void writeFrame(AMQDataBlock frame); public long writeRequest(int channelNum, AMQMethodBody methodBody, - AMQMethodListener methodListener) throws RequestResponseMappingException; + AMQMethodListener methodListener) + throws AMQException; public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody) - throws RequestResponseMappingException; + throws AMQException; + + public void writeResponse(AMQMethodEvent evt, AMQMethodBody response) + throws AMQException; }
