Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=509628&r1=509627&r2=509628 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original) +++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Feb 20 08:20:41 2007 @@ -1156,6 +1156,41 @@ protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); } + + public void createQueue(AMQShortString name, boolean autoDelete, boolean durable, boolean exclusive) throws AMQException + { + AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, + getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) + null, // arguments + autoDelete, // autoDelete + durable, // durable + exclusive, // exclusive + false, // nowait + false, // passive + name, // queue + getTicket()); // ticket + + getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class); + + } + + + public void bindQueue(AMQShortString queueName, AMQShortString routingKey, FieldTable arguments, AMQShortString exchangeName) throws AMQException + { + // TODO: Be aware of possible changes to parameter order as versions change. + AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, + getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) + arguments, // arguments + exchangeName, // exchange + false, // nowait + queueName, // queue + routingKey, // routingKey + getTicket()); // ticket + + + getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class); + } + /** * Declare the queue. *
Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java?view=diff&rev=509628&r1=509627&r2=509628 ============================================================================== --- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java (original) +++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java Tue Feb 20 08:20:41 2007 @@ -22,13 +22,19 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.MethodConverter_8_0; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; import org.apache.qpid.server.queue.AMQMessage; import java.util.Iterator; public class SimpleSendable implements Sendable { + + //todo fixme - remove 0-8 hard coding + ProtocolVersionMethodConverter _methodConverter = new MethodConverter_8_0(); + private final AMQMessage _message; public SimpleSendable(AMQMessage message) @@ -38,12 +44,12 @@ public void send(int channel, Member member) throws AMQException { - member.send(new AMQFrame(channel, _message.getPublishBody())); + member.send(new AMQFrame(channel, _methodConverter.convertToBody(_message.getMessagePublishInfo()))); member.send(new AMQFrame(channel, _message.getContentHeaderBody())); - Iterator<ContentBody> it = _message.getContentBodyIterator(); + Iterator<ContentChunk> it = _message.getContentBodyIterator(); while (it.hasNext()) { - member.send(new AMQFrame(channel, it.next())); + member.send(new AMQFrame(channel, _methodConverter.convertToBody(it.next()))); } } } Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java?view=diff&rev=509628&r1=509627&r2=509628 ============================================================================== --- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java (original) +++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java Tue Feb 20 08:20:41 2007 @@ -31,8 +31,6 @@ import org.apache.qpid.server.cluster.util.LogMessage; import org.apache.qpid.server.virtualhost.VirtualHost; -import java.util.concurrent.Executor; - /** * TODO: separate out an abstract base class from AMQQueue from which this inherits. It does * not require all the functionality currently in AMQQueue. @@ -81,8 +79,11 @@ void relay(AMQMessage msg) throws AMQException { - BasicPublishBody publish = msg.getPublishBody(); - publish.immediate = false; //can't as yet handle the immediate flag in a cluster + // TODO FIXME - can no longer update the publish body as it is an opaque wrapper object + // if cluster can handle immediate then it should wrap the wrapper... + +// BasicPublishBody publish = msg.getMessagePublishInfo(); +// publish.immediate = false; //can't as yet handle the immediate flag in a cluster // send this on to the broker for which it is acting as proxy: _groupMgr.send(_target, new SimpleSendable(msg)); Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java?view=diff&rev=509628&r1=509627&r2=509628 ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java (original) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java Tue Feb 20 08:20:41 2007 @@ -96,6 +96,8 @@ } } + + public static AMQFrame createAMQFrame(int channelId, ContentBody body) { final AMQFrame frame = new AMQFrame(channelId, body); Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java?view=diff&rev=509628&r1=509627&r2=509628 ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java (original) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java Tue Feb 20 08:20:41 2007 @@ -641,8 +641,7 @@ public static void writeTimestamp(ByteBuffer buffer, long timestamp) { - writeUnsignedInteger(buffer, 0/*timestamp msb*/); - writeUnsignedInteger(buffer, timestamp); + writeLong(buffer, timestamp); } public static boolean[] readBooleans(ByteBuffer buffer) @@ -765,8 +764,8 @@ public static long readTimestamp(ByteBuffer buffer) { // Discard msb from AMQ timestamp - buffer.getUnsignedInt(); - return buffer.getUnsignedInt(); + //buffer.getUnsignedInt(); + return buffer.getLong(); } Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java?view=auto&rev=509628 ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java (added) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java Tue Feb 20 08:20:41 2007 @@ -0,0 +1,104 @@ +package org.apache.qpid.framing; + +import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.abstraction.AbstractMethodConverter; + +import org.apache.mina.common.ByteBuffer; + +public class MethodConverter_8_0 extends AbstractMethodConverter implements ProtocolVersionMethodConverter +{ + private int _basicPublishClassId; + private int _basicPublishMethodId; + + public MethodConverter_8_0() + { + super((byte)8,(byte)0); + + + } + + public AMQBody convertToBody(ContentChunk contentChunk) + { + return new ContentBody(contentChunk.getData()); + } + + public ContentChunk convertToContentChunk(AMQBody body) + { + final ContentBody contentBodyChunk = (ContentBody) body; + + return new ContentChunk() + { + + public int getSize() + { + return contentBodyChunk.getSize(); + } + + public ByteBuffer getData() + { + return contentBodyChunk.payload; + } + + public void reduceToFit() + { + contentBodyChunk.reduceBufferToFit(); + } + }; + + } + + public void configure() + { + + _basicPublishClassId = BasicPublishBody.getClazz(getProtocolMajorVersion(),getProtocolMinorVersion()); + _basicPublishMethodId = BasicPublishBody.getMethod(getProtocolMajorVersion(),getProtocolMinorVersion()); + + } + + public MessagePublishInfo convertToInfo(AMQMethodBody methodBody) + { + final BasicPublishBody body = (BasicPublishBody) methodBody; + + return new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return body.getExchange(); + } + + public boolean isImmediate() + { + return body.getImmediate(); + } + + public boolean isMandatory() + { + return body.getMandatory(); + } + + public AMQShortString getRoutingKey() + { + return body.getRoutingKey(); + } + }; + + } + + public AMQMethodBody convertToBody(MessagePublishInfo info) + { + + return new BasicPublishBody(getProtocolMajorVersion(), + getProtocolMinorVersion(), + _basicPublishClassId, + _basicPublishMethodId, + info.getExchange(), + info.isImmediate(), + info.isMandatory(), + info.getRoutingKey(), + 0) ; // ticket + + } +} Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java?view=diff&rev=509628&r1=509627&r2=509628 ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java (original) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java Tue Feb 20 08:20:41 2007 @@ -20,6 +20,8 @@ */ package org.apache.qpid.framing; +import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; + import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; @@ -36,10 +38,53 @@ private AMQMethodBodyInstanceFactory[][] _registry = new AMQMethodBodyInstanceFactory[DEFAULT_MAX_CLASS_ID][]; + private ProtocolVersionMethodConverter _protocolVersionConverter; + public VersionSpecificRegistry(byte major, byte minor) { _protocolMajorVersion = major; _protocolMinorVersion = minor; + + _protocolVersionConverter = loadProtocolVersionConverters(major, minor); + } + + private static ProtocolVersionMethodConverter loadProtocolVersionConverters(byte protocolMajorVersion, byte protocolMinorVersion) + { + try + { + Class<ProtocolVersionMethodConverter> versionMethodConverterClass = + (Class<ProtocolVersionMethodConverter>) Class.forName("org.apache.qpid.framing.MethodConverter_"+protocolMajorVersion + "_" + protocolMinorVersion); + return versionMethodConverterClass.newInstance(); + + } + catch (ClassNotFoundException e) + { + _log.warn("Could not find protocol conversion classes for " + protocolMajorVersion + "-" + protocolMinorVersion); + if(protocolMinorVersion != 0) + { + protocolMinorVersion--; + return loadProtocolVersionConverters(protocolMajorVersion, protocolMinorVersion); + } + else if (protocolMajorVersion != 0) + { + protocolMajorVersion--; + return loadProtocolVersionConverters(protocolMajorVersion, protocolMinorVersion); + } + else + { + return null; + } + + + } + catch (IllegalAccessException e) + { + throw new IllegalStateException("Unable to load protocol version converter: ", e); + } + catch (InstantiationException e) + { + throw new IllegalStateException("Unable to load protocol version converter: ", e); + } } public byte getProtocolMajorVersion() @@ -137,5 +182,15 @@ return bodyFactory.newInstance(_protocolMajorVersion, _protocolMinorVersion, classID, methodID, in, size); + } + + public ProtocolVersionMethodConverter getProtocolVersionMethodConverter() + { + return _protocolVersionConverter; + } + + public void configure() + { + _protocolVersionConverter.configure(); } } Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java?view=auto&rev=509628 ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java (added) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java Tue Feb 20 08:20:41 2007 @@ -0,0 +1,26 @@ +package org.apache.qpid.framing.abstraction; + +public abstract class AbstractMethodConverter implements ProtocolVersionMethodConverter +{ + private final byte _protocolMajorVersion; + + + private final byte _protocolMinorVersion; + + public AbstractMethodConverter(byte major, byte minor) + { + _protocolMajorVersion = major; + _protocolMinorVersion = minor; + } + + + public final byte getProtocolMajorVersion() + { + return _protocolMajorVersion; + } + + public final byte getProtocolMinorVersion() + { + return _protocolMinorVersion; + } +} Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java?view=auto&rev=509628 ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java (added) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java Tue Feb 20 08:20:41 2007 @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.framing.abstraction; + +import org.apache.mina.common.ByteBuffer; + +public interface ContentChunk +{ + int getSize(); + ByteBuffer getData(); + + void reduceToFit(); +} Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java?view=auto&rev=509628 ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java (added) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java Tue Feb 20 08:20:41 2007 @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing.abstraction; + +import org.apache.qpid.framing.AMQShortString; + +public interface MessagePublishInfo +{ + + public AMQShortString getExchange(); + + public boolean isImmediate(); + + public boolean isMandatory(); + + public AMQShortString getRoutingKey(); + +} Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java?view=auto&rev=509628 ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java (added) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java Tue Feb 20 08:20:41 2007 @@ -0,0 +1,29 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.qpid.framing.abstraction; + +import org.apache.qpid.framing.AMQMethodBody; + + +public interface MessagePublishInfoConverter +{ + public MessagePublishInfo convertToInfo(AMQMethodBody body); + public AMQMethodBody convertToBody(MessagePublishInfo info); + +} Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java?view=auto&rev=509628 ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java (added) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java Tue Feb 20 08:20:41 2007 @@ -0,0 +1,29 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.qpid.framing.abstraction; + +import org.apache.qpid.framing.AMQBody; + +public interface ProtocolVersionMethodConverter extends MessagePublishInfoConverter +{ + AMQBody convertToBody(ContentChunk contentBody); + ContentChunk convertToContentChunk(AMQBody body); + + void configure(); +} Modified: incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java?view=diff&rev=509628&r1=509627&r2=509628 ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java (original) +++ incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java Tue Feb 20 08:20:41 2007 @@ -154,7 +154,7 @@ public void testSetGetTimestamp() { - long timestamp = 999999999; + long timestamp = System.currentTimeMillis(); _testProperties.setTimestamp(timestamp); assertEquals(timestamp, _testProperties.getTimestamp()); } Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java?view=diff&rev=509628&r1=509627&r2=509628 ============================================================================== --- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java (original) +++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java Tue Feb 20 08:20:41 2007 @@ -23,6 +23,8 @@ import junit.framework.TestCase; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.store.TestableMemoryMessageStore; @@ -103,16 +105,32 @@ for(int i = 0; i < messageCount; i++) { long deliveryTag = i + 1; - // TODO: fix hardcoded protocol version data - TestMessage message = new TestMessage(deliveryTag, i, new BasicPublishBody((byte)8, - (byte)0, - BasicPublishBody.getClazz((byte)8,(byte)0), - BasicPublishBody.getMethod((byte)8,(byte)0), - null, - false, - false, - null, - 0), txnContext); + + MessagePublishInfo info = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return null; + } + + public boolean isImmediate() + { + return false; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return null; + } + }; + + TestMessage message = new TestMessage(deliveryTag, i, info, txnContext); _map.add(deliveryTag, new UnacknowledgedMessage(null, message, null, deliveryTag)); } _acked = acked; @@ -174,7 +192,7 @@ private final long _tag; private int _count; - TestMessage(long tag, long messageId, BasicPublishBody publishBody, TransactionalContext txnContext) + TestMessage(long tag, long messageId, MessagePublishInfo publishBody, TransactionalContext txnContext) { super(messageId, publishBody, txnContext); _tag = tag; Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?view=diff&rev=509628&r1=509627&r2=509628 ============================================================================== --- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original) +++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Tue Feb 20 08:20:41 2007 @@ -23,6 +23,7 @@ import junit.framework.TestCase; import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageHandleFactory; @@ -149,15 +150,97 @@ return headers; } - static BasicPublishBody getPublishRequest(String id) + + static final class MessagePublishInfoImpl implements MessagePublishInfo + { + private AMQShortString _exchange; + private boolean _immediate; + private boolean _mandatory; + private AMQShortString _routingKey; + + + public MessagePublishInfoImpl(AMQShortString routingKey) + { + _routingKey = routingKey; + } + + public MessagePublishInfoImpl(AMQShortString exchange, boolean immediate, boolean mandatory, AMQShortString routingKey) + { + _exchange = exchange; + _immediate = immediate; + _mandatory = mandatory; + _routingKey = routingKey; + } + + public AMQShortString getExchange() + { + return _exchange; + } + + public boolean isImmediate() + { + return _immediate; + + } + + public boolean isMandatory() + { + return _mandatory; + } + + public AMQShortString getRoutingKey() + { + return _routingKey; + } + + + public void setExchange(AMQShortString exchange) + { + _exchange = exchange; + } + + public void setImmediate(boolean immediate) + { + _immediate = immediate; + } + + public void setMandatory(boolean mandatory) + { + _mandatory = mandatory; + } + + public void setRoutingKey(AMQShortString routingKey) + { + _routingKey = routingKey; + } + } + + static MessagePublishInfo getPublishRequest(final String id) { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Establish some way to determine the version for the test. - BasicPublishBody request = new BasicPublishBody((byte)8, (byte)0, - BasicPublishBody.getClazz((byte)8,(byte)0), - BasicPublishBody.getMethod((byte)8,(byte)0), - null,false,false,new AMQShortString(id),0); - + MessagePublishInfo request = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return null; + } + + public boolean isImmediate() + { + return false; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return new AMQShortString(id); + } + }; + return request; } @@ -221,7 +304,7 @@ this(getPublishRequest(id), getContentHeader(headers), null); } - private Message(BasicPublishBody publish, ContentHeaderBody header, List<ContentBody> bodies) throws AMQException + private Message(MessagePublishInfo publish, ContentHeaderBody header, List<ContentBody> bodies) throws AMQException { super(_messageStore.getNewMessageId(), publish, _txnContext, header); } @@ -265,7 +348,7 @@ { try { - return getPublishBody().routingKey; + return getMessagePublishInfo().getRoutingKey(); } catch (AMQException e) { Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java?view=diff&rev=509628&r1=509627&r2=509628 ============================================================================== --- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java (original) +++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java Tue Feb 20 08:20:41 2007 @@ -22,7 +22,6 @@ import org.apache.qpid.AMQException; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.util.TestApplicationRegistry; import org.apache.qpid.server.util.NullApplicationRegistry; import org.apache.qpid.framing.BasicPublishBody; @@ -55,13 +54,13 @@ Message m7 = new Message("Message7", "XXXXX"); - BasicPublishBody pb7 = m7.getPublishBody(); - pb7.mandatory = true; + MessagePublishInfoImpl pb7 = (MessagePublishInfoImpl) (m7.getMessagePublishInfo()); + pb7.setMandatory(true); routeAndTest(m7,true); Message m8 = new Message("Message8", "F0000"); - BasicPublishBody pb8 = m8.getPublishBody(); - pb8.mandatory = true; + MessagePublishInfoImpl pb8 = (MessagePublishInfoImpl)(m8.getMessagePublishInfo()); + pb8.setMandatory(true); routeAndTest(m8,false,q1); @@ -88,10 +87,10 @@ bindDefault("F0000"); Message m1 = new Message("Message1", "XXXXX"); Message m2 = new Message("Message2", "F0000"); - BasicPublishBody pb1 = m1.getPublishBody(); - pb1.mandatory = true; - BasicPublishBody pb2 = m2.getPublishBody(); - pb2.mandatory = true; + MessagePublishInfoImpl pb1 = (MessagePublishInfoImpl) (m1.getMessagePublishInfo()); + pb1.setMandatory(true); + MessagePublishInfoImpl pb2 = (MessagePublishInfoImpl) (m2.getMessagePublishInfo()); + pb2.setMandatory(true); routeAndTest(m1,true); } Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?view=diff&rev=509628&r1=509627&r2=509628 ============================================================================== --- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original) +++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Tue Feb 20 08:20:41 2007 @@ -22,6 +22,7 @@ import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -164,20 +165,32 @@ } } - private AMQMessage message(boolean immediate) throws AMQException + private AMQMessage message(final boolean immediate) throws AMQException { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Establish some way to determine the version for the test. - BasicPublishBody publish = new BasicPublishBody((byte)8, - (byte)0, - BasicPublishBody.getClazz((byte)8,(byte)0), - BasicPublishBody.getMethod((byte)8,(byte)0), - null, - immediate, - false, - null, - 0); - + MessagePublishInfo publish = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return null; + } + + public boolean isImmediate() + { + return immediate; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return null; + } + }; + ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); contentHeaderBody.bodySize = 1000; // in bytes return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody); Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java?view=diff&rev=509628&r1=509627&r2=509628 ============================================================================== --- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java (original) +++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java Tue Feb 20 08:20:41 2007 @@ -27,6 +27,7 @@ import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.ack.UnacknowledgedMessage; @@ -98,15 +99,29 @@ { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Establish some way to determine the version for the test. - BasicPublishBody publishBody = new BasicPublishBody((byte)8, - (byte)0, - BasicPublishBody.getClazz((byte)8,(byte)0), - BasicPublishBody.getMethod((byte)8,(byte)0), - new AMQShortString("someExchange"), - false, - false, - new AMQShortString("rk"), - 0); + MessagePublishInfo publishBody = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return new AMQShortString("someExchange"); + } + + public boolean isImmediate() + { + return false; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return new AMQShortString("rk"); + } + }; AMQMessage msg = new AMQMessage(_messageStore.getNewMessageId(), publishBody, txnContext); if (persistent) { Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java?view=diff&rev=509628&r1=509627&r2=509628 ============================================================================== --- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java (original) +++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java Tue Feb 20 08:20:41 2007 @@ -22,6 +22,8 @@ import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.SkeletonMessageStore; import org.apache.qpid.server.store.StoreContext; @@ -57,20 +59,32 @@ return message(false); } - AMQMessage message(boolean immediate) throws AMQException + AMQMessage message(final boolean immediate) throws AMQException { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Establish some way to determine the version for the test. - BasicPublishBody publish = new BasicPublishBody((byte)8, - (byte)0, - BasicPublishBody.getClazz((byte)8,(byte)0), - BasicPublishBody.getMethod((byte)8,(byte)0), - null, - immediate, - false, - null, - 0); - + MessagePublishInfo publish = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return null; + } + + public boolean isImmediate() + { + return immediate; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return null; + } + }; + return new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext, new ContentHeaderBody()); } Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java?view=diff&rev=509628&r1=509627&r2=509628 ============================================================================== --- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java (original) +++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java Tue Feb 20 08:20:41 2007 @@ -24,9 +24,12 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.exchange.Exchange; import java.util.List; import java.util.concurrent.atomic.AtomicLong; @@ -56,6 +59,26 @@ { } + public void createExchange(Exchange exchange) throws AMQException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void removeExchange(Exchange exchange) throws AMQException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + //To change body of implemented methods use File | Settings | File Templates. + } + public void createQueue(AMQQueue queue) throws AMQException { } @@ -87,7 +110,7 @@ return _messageId.getAndIncrement(); } - public void storeContentBodyChunk(StoreContext sc, Long messageId, int index, ContentBody contentBody, boolean lastContentBody) throws AMQException + public void storeContentBodyChunk(StoreContext sc, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException { } @@ -102,7 +125,7 @@ return null; } - public ContentBody getContentBodyChunk(StoreContext s,Long messageId, int index) throws AMQException + public ContentChunk getContentBodyChunk(StoreContext s,Long messageId, int index) throws AMQException { return null; } Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java?view=diff&rev=509628&r1=509627&r2=509628 ============================================================================== --- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java (original) +++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java Tue Feb 20 08:20:41 2007 @@ -25,6 +25,8 @@ import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.MessageHandleFactory; import org.apache.qpid.server.txn.NonTransactionalContext; @@ -50,16 +52,32 @@ public void testMessageGetsRemoved() throws AMQException { createPersistentContentHeader(); - // TODO: fix hardcoded protocol version data - AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8, - (byte)0, - BasicPublishBody.getClazz((byte)8,(byte)0), - BasicPublishBody.getMethod((byte)8,(byte)0), - null, - false, - false, - null, - 0), + + MessagePublishInfo info = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return null; + } + + public boolean isImmediate() + { + return false; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return null; + } + }; + + AMQMessage message = new AMQMessage(_store.getNewMessageId(), info, new NonTransactionalContext(_store, _storeContext, null, null, null), createPersistentContentHeader()); message.incrementReference(); @@ -81,16 +99,33 @@ public void testMessageRemains() throws AMQException { - // TODO: fix hardcoded protocol version data - AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8, - (byte)0, - BasicPublishBody.getClazz((byte)8,(byte)0), - BasicPublishBody.getMethod((byte)8,(byte)0), - null, - false, - false, - null, - 0), + + MessagePublishInfo info = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return null; + } + + public boolean isImmediate() + { + return false; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return null; + } + }; + + AMQMessage message = new AMQMessage(_store.getNewMessageId(), + info, new NonTransactionalContext(_store, _storeContext, null, null, null), createPersistentContentHeader()); message.incrementReference(); Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java?view=diff&rev=509628&r1=509627&r2=509628 ============================================================================== --- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java (original) +++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java Tue Feb 20 08:20:41 2007 @@ -22,6 +22,7 @@ import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.abstraction.ContentChunk; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -35,7 +36,7 @@ public TestableMemoryMessageStore() { _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(); - _contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>(); + _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(); } public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap() @@ -43,7 +44,7 @@ return _metaDataMap; } - public ConcurrentMap<Long, List<ContentBody>> getContentBodyMap() + public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap() { return _contentBodyMap; }
