Author: kpvdr
Date: Thu Jan 18 13:39:29 2007
New Revision: 497585
URL: http://svn.apache.org/viewvc?view=rev&rev=497585
Log:
Changed Content to use ByteBuffer, added Message.Transfer and Message.Cancel
handlers
Modified:
incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/JavaGenerator.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java
Modified:
incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/JavaGenerator.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/JavaGenerator.java?view=diff&rev=497585&r1=497584&r2=497585
==============================================================================
---
incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/JavaGenerator.java
(original)
+++
incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/JavaGenerator.java
Thu Jan 18 13:39:29 2007
@@ -1054,8 +1054,16 @@
int ordinal, int indentSize, int tabSize)
{
StringBuffer sb = new StringBuffer();
- sb.append(Utils.createSpaces(indentSize) +
- "buf.append(\" " + fieldName + ": \" + " + fieldName +
");" + cr);
+ if (domain.compareTo("longstr") == 0)
+ {
+ sb.append(Utils.createSpaces(indentSize) +
+ "buf.append(\" " + fieldName + ": \" + new
String(" + fieldName + "));" + cr);
+ }
+ else
+ {
+ sb.append(Utils.createSpaces(indentSize) +
+ "buf.append(\" " + fieldName + ": \" + " +
fieldName + ");" + cr);
+ }
return sb.toString();
}
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=497585&r1=497584&r2=497585
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Thu Jan 18 13:39:29 2007
@@ -197,7 +197,7 @@
route(message);
break;
case CONTENT_TYPE_REFERENCE:
- getMessages(body.getContent()).add(message);
+ getMessages(body.getContentAsByteArray()).add(message);
break;
}
}
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java?view=diff&rev=497585&r1=497584&r2=497585
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java
Thu Jan 18 13:39:29 2007
@@ -21,8 +21,11 @@
package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.MessageCancelBody;
+import org.apache.qpid.framing.MessageOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
@@ -48,7 +51,15 @@
AMQMethodEvent<MessageCancelBody> evt)
throws AMQException
{
- // TODO
+ final AMQChannel channel =
protocolSession.getChannel(evt.getChannelId());
+ final MessageCancelBody body = evt.getMethod();
+ channel.unsubscribeConsumer(protocolSession, body.destination);
+
+ // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
+ // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQMethodBody methodBody =
MessageOkBody.createMethodBody((byte)0, (byte)9);
+ protocolSession.writeResponse(evt, methodBody);
}
}
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java?view=diff&rev=497585&r1=497584&r2=497585
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
Thu Jan 18 13:39:29 2007
@@ -74,11 +74,11 @@
protocolSession.closeChannel(evt.getChannelId());
// TODO: modify code gen to make getClazz and getMethod public
methods rather than protected
// then we can remove the hardcoded 0,0
- // AMQP version change: Hardwire the version to 0-8 (major=8,
minor=0)
+ // AMQP version change: Hardwire the version to 0-9 (major=0,
minor=9)
// TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions
change.
AMQMethodBody cf = ChannelCloseBody.createMethodBody
- ((byte)8, (byte)0, // AMQP version (major, minor)
+ ((byte)0, (byte)9, // AMQP version (major, minor)
MessageTransferBody.getClazz((byte)0, (byte)9), //
classId
MessageTransferBody.getMethod((byte)0, (byte)9), //
methodId
500, // replyCode
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=497585&r1=497584&r2=497585
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
Thu Jan 18 13:39:29 2007
@@ -150,7 +150,7 @@
Content body = _transferBody.getBody();
switch (body.getContentType()) {
case CONTENT_TYPE_INLINE:
- return _transferBody.getBody().getContent().length;
+ return _transferBody.getBody().getContent().limit();
case CONTENT_TYPE_REFERENCE:
return getReferenceSize();
default:
Modified:
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=497585&r1=497584&r2=497585
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
Thu Jan 18 13:39:29 2007
@@ -557,8 +557,6 @@
messageHeaders.setExpiration(0);
}
}
-// messageHeaders.setDeliveryMode((byte) deliveryMode);
-// messageHeaders.setPriority((byte) priority);
int size = (payload != null) ? payload.limit() : 0;
Content[] content = createContent(payload);
@@ -656,7 +654,7 @@
if (frameCount == 1)
{
- bodies[0] = new
Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE, payload.array());
+ bodies[0] = new
Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE, payload);
}
else
{
@@ -666,7 +664,7 @@
payload.position((int) framePayloadMax * i);
int length = (remaining >= framePayloadMax) ? (int)
framePayloadMax : (int) remaining;
payload.limit(payload.position() + length);
- bodies[i] = new
Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE, payload.slice().array());
+ bodies[i] = new
Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE, payload.slice());
remaining -= length;
}
}
Modified:
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?view=diff&rev=497585&r1=497584&r2=497585
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
Thu Jan 18 13:39:29 2007
@@ -41,7 +41,7 @@
MessageHeaders contentHeader, Content body) throws
AMQException {
ByteBuffer data;
- data = ByteBuffer.allocate(body.content.length);
+ data = ByteBuffer.allocate(body.content.remaining());
data.put(body.content);
data.flip();
Modified:
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java?view=diff&rev=497585&r1=497584&r2=497585
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java
Thu Jan 18 13:39:29 2007
@@ -43,7 +43,7 @@
}
public ContentTypeEnum contentType;
- public byte[] content;
+ public ByteBuffer content;
// Constructors
@@ -63,20 +63,13 @@
throw new IllegalArgumentException("Content cannot be empty for
a ref type.");
}
this.contentType = contentType;
- this.content = content;
+ this.content = ByteBuffer.allocate(content.length);
+ this.content.put(content);
}
- public Content(ContentTypeEnum contentType, String content)
+ public Content(ContentTypeEnum contentType, String contentStr)
{
- if (contentType == ContentTypeEnum.CONTENT_TYPE_REFERENCE)
- {
- if (content == null)
- throw new IllegalArgumentException("Content cannot be null for
a ref type.");
- if (content.length() == 0)
- throw new IllegalArgumentException("Content cannot be empty for
a ref type.");
- }
- this.contentType = contentType;
- this.content = content.getBytes();
+ this(contentType, contentStr.getBytes());
}
public Content(ContentTypeEnum contentType, ByteBuffer content)
@@ -89,18 +82,26 @@
throw new IllegalArgumentException("Content cannot be empty for
a ref type.");
}
this.contentType = contentType;
- this.content = content.array();
+ this.content = content;
}
// Get functions
public ContentTypeEnum getContentType() { return contentType; }
- public byte[] getContent() { return content; }
+ public ByteBuffer getContent() { return content; }
+
+ public byte[] getContentAsByteArray()
+ {
+ byte[] ba = new byte[content.remaining()];
+ content.get(ba);
+ return ba;
+ }
+
public String getContentAsString()
{
if (content == null)
return null;
- return new String(content);
+ return new String(getContentAsByteArray());
}
// Wire functions
@@ -109,18 +110,22 @@
{
if (content == null)
return 1 + 4;
- return 1 + 4 + content.length;
+ return 1 + 4 + content.remaining();
}
public void writePayload(ByteBuffer buffer)
{
EncodingUtils.writeUnsignedByte(buffer, contentType.toByte());
- EncodingUtils.writeLongStringBytes(buffer, content);
+ EncodingUtils.writeUnsignedInteger(buffer, content.remaining());
+ buffer.put(content);
}
public void populateFromBuffer(ByteBuffer buffer) throws
AMQFrameDecodingException
{
contentType = ContentTypeEnum.toContentEnum(buffer.get());
- content = EncodingUtils.readLongstr(buffer);
+ int length = buffer.getInt();
+ content = buffer.slice();
+ buffer.skip(length);
+ content.limit(length);
}
}