Author: rgodfrey Date: Fri Oct 10 09:54:36 2014 New Revision: 1630745 URL: http://svn.apache.org/r1630745 Log: More refactoring
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java qpid/branches/QPID-6125-ProtocolRefactoring/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java Fri Oct 10 09:54:36 2014 @@ -30,8 +30,6 @@ import java.util.Arrays; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.framing.MessagePublishInfo; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10; @@ -241,8 +239,6 @@ public class BDBMessageStoreTest extends private ContentHeaderBody createContentHeaderBody_0_8(BasicContentHeaderProperties props, int length) { - MethodRegistry methodRegistry = new MethodRegistry(ProtocolVersion.v0_9); - int classForBasic = methodRegistry.createBasicQosOkBody().getClazz(); return new ContentHeaderBody(props, length); } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Fri Oct 10 09:54:36 2014 @@ -133,6 +133,7 @@ public class AMQProtocolEngine implement /* AMQP Version for this session */ private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion(); private final MethodRegistry _methodRegistry = new MethodRegistry(_protocolVersion); + private final FrameCreatingMethodProcessor _methodProcessor = new FrameCreatingMethodProcessor(_protocolVersion); private final List<Action<? super AMQProtocolEngine>> _taskList = new CopyOnWriteArrayList<Action<? super AMQProtocolEngine>>(); @@ -185,7 +186,7 @@ public class AMQProtocolEngine implement _transport = transport; _maxNoOfChannels = broker.getConnection_sessionCountLimit(); _receivedLock = new ReentrantLock(); - _decoder = new AMQDecoder(true, _methodRegistry); + _decoder = new AMQDecoder(true, _methodProcessor); _connectionID = connectionId; _logSubject = new ConnectionLogSubject(this); @@ -296,10 +297,11 @@ public class AMQProtocolEngine implement _readBytes += msg.remaining(); _receivedLock.lock(); + List<AMQDataBlock> processedMethods = _methodProcessor.getProcessedMethods(); try { - final ArrayList<AMQDataBlock> dataBlocks = _decoder.decodeBuffer(msg); - for (AMQDataBlock dataBlock : dataBlocks) + _decoder.decodeBuffer(msg); + for (AMQDataBlock dataBlock : processedMethods) { try { @@ -320,6 +322,7 @@ public class AMQProtocolEngine implement break; } } + processedMethods.clear(); receivedComplete(); } catch (ConnectionScopedRuntimeException e) @@ -349,6 +352,7 @@ public class AMQProtocolEngine implement } finally { + processedMethods.clear(); _receivedLock.unlock(); } return null; @@ -1089,13 +1093,32 @@ public class AMQProtocolEngine implement private void closeConnection(int channelId, AMQConnectionException e) { - try + + if (_logger.isInfoEnabled()) { - if (_logger.isInfoEnabled()) - { - _logger.info("Closing connection due to: " + e); - } + _logger.info("Closing connection due to: " + e); + } + closeConnection(channelId, e.getCloseFrame()); + } + + + void closeConnection(AMQConstant errorCode, + String message, int channelId, + int classId, + int methodId) + { + + if (_logger.isInfoEnabled()) + { + _logger.info("Closing connection due to: " + message); + } + closeConnection(channelId, new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), errorCode.getCode(), AMQShortString.validValueOf(message), classId, methodId))); + } + private void closeConnection(int channelId, AMQFrame frame) + { + try + { markChannelAwaitingCloseOk(channelId); closeSession(); } @@ -1103,7 +1126,7 @@ public class AMQProtocolEngine implement { try { - writeFrame(e.getCloseFrame()); + writeFrame(frame); } finally { @@ -1208,6 +1231,7 @@ public class AMQProtocolEngine implement { _protocolVersion = pv; _methodRegistry.setProtocolVersion(_protocolVersion); + _methodProcessor.setProtocolVersion(_protocolVersion); _protocolOutputConverter = new ProtocolOutputConverterImpl(this); _dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(this); } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Fri Oct 10 09:54:36 2014 @@ -23,8 +23,8 @@ package org.apache.qpid.client.protocol; import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; @@ -193,7 +193,7 @@ public class AMQProtocolHandler implemen _connection = con; _protocolSession = new AMQProtocolSession(this, _connection); _stateManager = new AMQStateManager(_protocolSession); - _decoder = new AMQDecoder(false, _protocolSession.getMethodRegistry()); + _decoder = new AMQDecoder(false, _protocolSession.getMethodProcessor()); _failoverHandler = new FailoverHandler(this); } @@ -459,9 +459,10 @@ public class AMQProtocolHandler implemen { _readBytes += msg.remaining(); _lastReadTime = System.currentTimeMillis(); + final List<AMQDataBlock> dataBlocks = _protocolSession.getMethodProcessor().getProcessedMethods(); try { - final ArrayList<AMQDataBlock> dataBlocks = _decoder.decodeBuffer(msg); + _decoder.decodeBuffer(msg); // Decode buffer int size = dataBlocks.size(); @@ -511,6 +512,10 @@ public class AMQProtocolHandler implemen propagateExceptionToFrameListeners(e); exception(e); } + finally + { + dataBlocks.clear(); + } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Fri Oct 10 09:54:36 2014 @@ -44,6 +44,7 @@ import org.apache.qpid.framing.AMQShortS import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FrameCreatingMethodProcessor; import org.apache.qpid.framing.HeartbeatBody; import org.apache.qpid.framing.MethodDispatcher; import org.apache.qpid.framing.MethodRegistry; @@ -91,6 +92,9 @@ public class AMQProtocolSession implemen private final MethodRegistry _methodRegistry = new MethodRegistry(ProtocolVersion.getLatestSupportedVersion()); + private final FrameCreatingMethodProcessor _methodProcessor = + new FrameCreatingMethodProcessor(ProtocolVersion.getLatestSupportedVersion()); + private MethodDispatcher _methodDispatcher; private final AMQConnection _connection; @@ -416,7 +420,8 @@ public class AMQProtocolSession implemen _logger.debug("Setting ProtocolVersion to :" + pv); } _protocolVersion = pv; - _methodRegistry.setProtocolVersion(pv);; + _methodRegistry.setProtocolVersion(pv); + _methodProcessor.setProtocolVersion(pv); _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, this); } @@ -549,4 +554,9 @@ public class AMQProtocolSession implemen { _protocolHandler.setMaxFrameSize(frameMax); } + + public FrameCreatingMethodProcessor getMethodProcessor() + { + return _methodProcessor; + } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java Fri Oct 10 09:54:36 2014 @@ -30,14 +30,13 @@ import java.util.ArrayList; import java.util.List; import java.util.ListIterator; -import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQDataBlockDecoder; import org.apache.qpid.framing.AMQFrameDecodingException; import org.apache.qpid.framing.AMQProtocolVersionException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ByteArrayDataInput; import org.apache.qpid.framing.EncodingUtils; -import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.MethodProcessor; import org.apache.qpid.framing.ProtocolInitiation; /** @@ -54,7 +53,8 @@ import org.apache.qpid.framing.ProtocolI */ public class AMQDecoder { - private final MethodRegistry _registry; + private final MethodProcessor _methodProcessor; + /** Holds the 'normal' AMQP data decoder. */ private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder(); @@ -73,12 +73,12 @@ public class AMQDecoder * Creates a new AMQP decoder. * * @param expectProtocolInitiation <tt>true</tt> if this decoder needs to handle protocol initiation. - * @param registry method registry + * @param methodProcessor method processor */ - public AMQDecoder(boolean expectProtocolInitiation, MethodRegistry registry) + public AMQDecoder(boolean expectProtocolInitiation, MethodProcessor methodProcessor) { _expectProtocolInitiation = expectProtocolInitiation; - _registry = registry; + _methodProcessor = methodProcessor; } @@ -217,14 +217,13 @@ public class AMQDecoder } - public ArrayList<AMQDataBlock> decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException + public void decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException { - // get prior remaining data from accumulator - ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>(); MarkableDataInput msg; + // get prior remaining data from accumulator ByteArrayInputStream bais; DataInput di; if(!_remainingBufs.isEmpty()) @@ -258,9 +257,7 @@ public class AMQDecoder enoughData = _dataBlockDecoder.decodable(msg); if (enoughData) { - dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_registry.getProtocolVersion(), - _registry.getMethodProcessor(), - msg)); + _dataBlockDecoder.processInput(_methodProcessor, msg); } } else @@ -268,7 +265,7 @@ public class AMQDecoder enoughData = _piDecoder.decodable(msg); if (enoughData) { - dataBlocks.add(new ProtocolInitiation(msg)); + _methodProcessor.receiveProtocolHeader(new ProtocolInitiation(msg)); } } @@ -305,6 +302,5 @@ public class AMQDecoder } } } - return dataBlocks; } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java Fri Oct 10 09:54:36 2014 @@ -35,7 +35,8 @@ public class AMQDataBlockDecoder private int _maxFrameSize = AMQConstant.FRAME_MIN_SIZE.getCode(); public AMQDataBlockDecoder() - { } + { + } public boolean decodable(MarkableDataInput in) throws AMQFrameDecodingException, IOException { @@ -52,9 +53,13 @@ public class AMQDataBlockDecoder // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt() final long bodySize = in.readInt() & 0xffffffffL; - if(bodySize > _maxFrameSize) + if (bodySize > _maxFrameSize) { - throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Incoming frame size of "+bodySize+" is larger than negotiated maximum of " + _maxFrameSize); + throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, + "Incoming frame size of " + + bodySize + + " is larger than negotiated maximum of " + + _maxFrameSize); } in.reset(); @@ -62,9 +67,8 @@ public class AMQDataBlockDecoder } - public <T> T createAndPopulateFrame(ProtocolVersion pv, - MethodProcessor<T> processor, - MarkableDataInput in) + public void processInput(MethodProcessor processor, + MarkableDataInput in) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException { final byte type = in.readByte(); @@ -75,24 +79,24 @@ public class AMQDataBlockDecoder // bodySize can be zero if ((channel < 0) || (bodySize < 0)) { - throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Undecodable frame: type = " + type + " channel = " + channel - + " bodySize = " + bodySize); + throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, + "Undecodable frame: type = " + type + " channel = " + channel + + " bodySize = " + bodySize); } - T result; - switch(type) + switch (type) { case 1: - result = processMethod(channel, in, processor, pv); + processMethod(channel, in, processor); break; case 2: - result = ContentHeaderBody.process(channel, in, processor, bodySize); + ContentHeaderBody.process(channel, in, processor, bodySize); break; case 3: - result = ContentBody.process(channel, in, processor, bodySize); + ContentBody.process(channel, in, processor, bodySize); break; case 8: - result = HeartbeatBody.process(channel, in, processor, bodySize); + HeartbeatBody.process(channel, in, processor, bodySize); break; default: throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Unsupported frame type: " + type); @@ -101,11 +105,11 @@ public class AMQDataBlockDecoder byte marker = in.readByte(); if ((marker & 0xFF) != 0xCE) { - throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "End of frame marker not found. Read " + marker + " length=" + bodySize - + " type=" + type); + throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, + "End of frame marker not found. Read " + marker + " length=" + bodySize + + " type=" + type); } - return result; } public void setMaxFrameSize(final int maxFrameSize) @@ -113,200 +117,277 @@ public class AMQDataBlockDecoder _maxFrameSize = maxFrameSize; } - private <T> T processMethod(int channelId, MarkableDataInput in, MethodProcessor<T> dispatcher, ProtocolVersion protocolVersion) + private void processMethod(int channelId, + MarkableDataInput in, + MethodProcessor dispatcher) throws AMQFrameDecodingException, IOException { final int classAndMethod = in.readInt(); - switch (classAndMethod) { //CONNECTION_CLASS: case 0x000a000a: - return ConnectionStartBody.process(in, dispatcher); + ConnectionStartBody.process(in, dispatcher); + break; case 0x000a000b: - return ConnectionStartOkBody.process(in, dispatcher); + ConnectionStartOkBody.process(in, dispatcher); + break; case 0x000a0014: - return ConnectionSecureBody.process(in, dispatcher); + ConnectionSecureBody.process(in, dispatcher); + break; case 0x000a0015: - return ConnectionSecureOkBody.process(in, dispatcher); + ConnectionSecureOkBody.process(in, dispatcher); + break; case 0x000a001e: - return ConnectionTuneBody.process(in, dispatcher); + ConnectionTuneBody.process(in, dispatcher); + break; case 0x000a001f: - return ConnectionTuneOkBody.process(in, dispatcher); + ConnectionTuneOkBody.process(in, dispatcher); + break; case 0x000a0028: - return ConnectionOpenBody.process(in, dispatcher); + ConnectionOpenBody.process(in, dispatcher); + break; case 0x000a0029: - return ConnectionOpenOkBody.process(in, dispatcher); + ConnectionOpenOkBody.process(in, dispatcher); + break; case 0x000a002a: - return ConnectionRedirectBody.process(in, dispatcher); + ConnectionRedirectBody.process(in, dispatcher); + break; case 0x000a0032: - if (protocolVersion.equals(ProtocolVersion.v8_0)) + if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) { - return ConnectionRedirectBody.process(in, dispatcher); + ConnectionRedirectBody.process(in, dispatcher); } else { - return ConnectionCloseBody.process(in, dispatcher); + ConnectionCloseBody.process(in, dispatcher); } + break; case 0x000a0033: - if (protocolVersion.equals(ProtocolVersion.v8_0)) + if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) { - throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), protocolVersion); + throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), + dispatcher.getProtocolVersion()); } else { - return dispatcher.connectionCloseOk(); + dispatcher.receiveConnectionCloseOk(); } + break; case 0x000a003c: - if (protocolVersion.equals(ProtocolVersion.v8_0)) + if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) { - return ConnectionCloseBody.process(in, dispatcher); + ConnectionCloseBody.process(in, dispatcher); } else { - throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), protocolVersion); + throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), + dispatcher.getProtocolVersion()); } + break; case 0x000a003d: - if (protocolVersion.equals(ProtocolVersion.v8_0)) + if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) { - return dispatcher.connectionCloseOk(); + dispatcher.receiveConnectionCloseOk(); } else { - throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), protocolVersion); + throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), + dispatcher.getProtocolVersion()); } + break; // CHANNEL_CLASS: case 0x0014000a: - return ChannelOpenBody.process(channelId, in, dispatcher); + ChannelOpenBody.process(channelId, in, dispatcher); + break; case 0x0014000b: - return ChannelOpenOkBody.process(channelId, in, protocolVersion, dispatcher); + ChannelOpenOkBody.process(channelId, in, dispatcher.getProtocolVersion(), dispatcher); + break; case 0x00140014: - return ChannelFlowBody.process(channelId, in, dispatcher); + ChannelFlowBody.process(channelId, in, dispatcher); + break; case 0x00140015: - return ChannelFlowOkBody.process(channelId, in, dispatcher); + ChannelFlowOkBody.process(channelId, in, dispatcher); + break; case 0x0014001e: - return ChannelAlertBody.process(channelId, in, dispatcher); + ChannelAlertBody.process(channelId, in, dispatcher); + break; case 0x00140028: - return ChannelCloseBody.process(channelId, in, dispatcher); + ChannelCloseBody.process(channelId, in, dispatcher); + break; case 0x00140029: - return dispatcher.channelCloseOk(channelId); + dispatcher.receiveChannelCloseOk(channelId); + break; // ACCESS_CLASS: case 0x001e000a: - return AccessRequestBody.process(channelId, in, dispatcher); + AccessRequestBody.process(channelId, in, dispatcher); + break; case 0x001e000b: - return AccessRequestOkBody.process(channelId, in, dispatcher); + AccessRequestOkBody.process(channelId, in, dispatcher); + break; // EXCHANGE_CLASS: case 0x0028000a: - return ExchangeDeclareBody.process(channelId, in, dispatcher); + ExchangeDeclareBody.process(channelId, in, dispatcher); + break; case 0x0028000b: - return dispatcher.exchangeDeclareOk(channelId); + dispatcher.receiveExchangeDeclareOk(channelId); + break; case 0x00280014: - return ExchangeDeleteBody.process(channelId, in, dispatcher); + ExchangeDeleteBody.process(channelId, in, dispatcher); + break; case 0x00280015: - return dispatcher.exchangeDeleteOk(channelId); + dispatcher.receiveExchangeDeleteOk(channelId); + break; case 0x00280016: - return ExchangeBoundBody.process(channelId, in, dispatcher); + ExchangeBoundBody.process(channelId, in, dispatcher); + break; case 0x00280017: - return ExchangeBoundOkBody.process(channelId, in, dispatcher); + ExchangeBoundOkBody.process(channelId, in, dispatcher); + break; // QUEUE_CLASS: case 0x0032000a: - return QueueDeclareBody.process(channelId, in, dispatcher); + QueueDeclareBody.process(channelId, in, dispatcher); + break; case 0x0032000b: - return QueueDeclareOkBody.process(channelId, in, dispatcher); + QueueDeclareOkBody.process(channelId, in, dispatcher); + break; case 0x00320014: - return QueueBindBody.process(channelId, in, dispatcher); + QueueBindBody.process(channelId, in, dispatcher); + break; case 0x00320015: - return dispatcher.queueBindOk(channelId); + dispatcher.receiveQueueBindOk(channelId); + break; case 0x0032001e: - return QueuePurgeBody.process(channelId, in, dispatcher); + QueuePurgeBody.process(channelId, in, dispatcher); + break; case 0x0032001f: - return QueuePurgeOkBody.process(channelId, in, dispatcher); + QueuePurgeOkBody.process(channelId, in, dispatcher); + break; case 0x00320028: - return QueueDeleteBody.process(channelId, in, dispatcher); + QueueDeleteBody.process(channelId, in, dispatcher); + break; case 0x00320029: - return QueueDeleteOkBody.process(channelId, in, dispatcher); + QueueDeleteOkBody.process(channelId, in, dispatcher); + break; case 0x00320032: - return QueueUnbindBody.process(channelId, in, dispatcher); + QueueUnbindBody.process(channelId, in, dispatcher); + break; case 0x00320033: - return dispatcher.queueUnbindOk(channelId); + dispatcher.receiveQueueUnbindOk(channelId); + break; // BASIC_CLASS: case 0x003c000a: - return BasicQosBody.process(channelId, in, dispatcher); + BasicQosBody.process(channelId, in, dispatcher); + break; case 0x003c000b: - return dispatcher.basicQosOk(channelId); + dispatcher.receiveBasicQosOk(channelId); + break; case 0x003c0014: - return BasicConsumeBody.process(channelId, in, dispatcher); + BasicConsumeBody.process(channelId, in, dispatcher); + break; case 0x003c0015: - return BasicConsumeOkBody.process(channelId, in, dispatcher); + BasicConsumeOkBody.process(channelId, in, dispatcher); + break; case 0x003c001e: - return BasicCancelBody.process(channelId, in, dispatcher); + BasicCancelBody.process(channelId, in, dispatcher); + break; case 0x003c001f: - return BasicCancelOkBody.process(channelId, in, dispatcher); + BasicCancelOkBody.process(channelId, in, dispatcher); + break; case 0x003c0028: - return BasicPublishBody.process(channelId, in, dispatcher); + BasicPublishBody.process(channelId, in, dispatcher); + break; case 0x003c0032: - return BasicReturnBody.process(channelId, in, dispatcher); + BasicReturnBody.process(channelId, in, dispatcher); + break; case 0x003c003c: - return BasicDeliverBody.process(channelId, in, dispatcher); + BasicDeliverBody.process(channelId, in, dispatcher); + break; case 0x003c0046: - return BasicGetBody.process(channelId, in, dispatcher); + BasicGetBody.process(channelId, in, dispatcher); + break; case 0x003c0047: - return BasicGetOkBody.process(channelId, in, dispatcher); + BasicGetOkBody.process(channelId, in, dispatcher); + break; case 0x003c0048: - return BasicGetEmptyBody.process(channelId, in, dispatcher); + BasicGetEmptyBody.process(channelId, in, dispatcher); + break; case 0x003c0050: - return BasicAckBody.process(channelId, in, dispatcher); + BasicAckBody.process(channelId, in, dispatcher); + break; case 0x003c005a: - return BasicRejectBody.process(channelId, in, dispatcher); + BasicRejectBody.process(channelId, in, dispatcher); + break; case 0x003c0064: - return BasicRecoverBody.process(channelId, in, protocolVersion, dispatcher); + BasicRecoverBody.process(channelId, in, dispatcher.getProtocolVersion(), dispatcher); + break; case 0x003c0065: - return dispatcher.basicRecoverSyncOk(channelId); + dispatcher.receiveBasicRecoverSyncOk(channelId); + break; case 0x003c0066: - return BasicRecoverSyncBody.process(channelId, in, dispatcher); + BasicRecoverSyncBody.process(channelId, in, dispatcher); + break; case 0x003c006e: - return BasicRecoverSyncBody.process(channelId, in, dispatcher); + BasicRecoverSyncBody.process(channelId, in, dispatcher); + break; case 0x003c006f: - return dispatcher.basicRecoverSyncOk(channelId); + dispatcher.receiveBasicRecoverSyncOk(channelId); + break; // TX_CLASS: case 0x005a000a: - return dispatcher.txSelect(channelId); + dispatcher.receiveTxSelect(channelId); + break; case 0x005a000b: - return dispatcher.txSelectOk(channelId); + dispatcher.receiveTxSelectOk(channelId); + break; case 0x005a0014: - return dispatcher.txCommit(channelId); + dispatcher.receiveTxCommit(channelId); + break; case 0x005a0015: - return dispatcher.txCommitOk(channelId); + dispatcher.receiveTxCommitOk(channelId); + break; case 0x005a001e: - return dispatcher.txRollback(channelId); + dispatcher.receiveTxRollback(channelId); + break; case 0x005a001f: - return dispatcher.txRollbackOk(channelId); + dispatcher.receiveTxRollbackOk(channelId); + break; default: - throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), protocolVersion); + throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), + dispatcher.getProtocolVersion()); } } - private AMQFrameDecodingException newUnknownMethodException(final int classId, final int methodId, ProtocolVersion protocolVersion) + private AMQFrameDecodingException newUnknownMethodException(final int classId, + final int methodId, + ProtocolVersion protocolVersion) { return new AMQFrameDecodingException(AMQConstant.COMMAND_INVALID, - "Method " + methodId + " unknown in AMQP version " + protocolVersion - + " (while trying to decode class " + classId + " method " + methodId + "."); + "Method " + + methodId + + " unknown in AMQP version " + + protocolVersion + + " (while trying to decode class " + + classId + + " method " + + methodId + + "."); } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java Fri Oct 10 09:54:36 2014 @@ -165,9 +165,9 @@ public class AccessRequestBody extends A return buf.toString(); } - public static <T> T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor<T> dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { AMQShortString realm = buffer.readAMQShortString(); byte bitfield = buffer.readByte(); @@ -176,6 +176,6 @@ public class AccessRequestBody extends A boolean active = (bitfield & 0x04) == 0x4 ; boolean write = (bitfield & 0x08) == 0x8 ; boolean read = (bitfield & 0x10) == 0x10 ; - return dispatcher.accessRequest(channelId, realm, exclusive, passive, active, write, read); + dispatcher.receiveAccessRequest(channelId, realm, exclusive, passive, active, write, read); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java Fri Oct 10 09:54:36 2014 @@ -95,10 +95,10 @@ public class AccessRequestOkBody extends return buf.toString(); } - public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) + public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException { int ticket = buffer.readUnsignedShort(); - return dispatcher.accessRequestOk(channelId, ticket); + dispatcher.receiveAccessRequestOk(channelId, ticket); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java Fri Oct 10 09:54:36 2014 @@ -112,13 +112,13 @@ public class BasicAckBody extends AMQMet return buf.toString(); } - public static <T> T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor<T> dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { long deliveryTag = buffer.readLong(); boolean multiple = (buffer.readByte() & 0x01) != 0; - return dispatcher.basicAck(channelId, deliveryTag, multiple); + dispatcher.receiveBasicAck(channelId, deliveryTag, multiple); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java Fri Oct 10 09:54:36 2014 @@ -113,13 +113,13 @@ public class BasicCancelBody extends AMQ return buf.toString(); } - public static <T> T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor<T> dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { AMQShortString consumerTag = buffer.readAMQShortString(); boolean noWait = (buffer.readByte() & 0x01) == 0x01; - return dispatcher.basicCancel(channelId, consumerTag, noWait); + dispatcher.receiveBasicCancel(channelId, consumerTag, noWait); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java Fri Oct 10 09:54:36 2014 @@ -96,10 +96,10 @@ public class BasicCancelOkBody extends A return buf.toString(); } - public static <T> T process(final int channelId, final MarkableDataInput in, final MethodProcessor<T> dispatcher) + public static void process(final int channelId, final MarkableDataInput in, final MethodProcessor dispatcher) throws IOException { AMQShortString consumerTag = in.readAMQShortString(); - return dispatcher.basicCancelOk(channelId, consumerTag); + dispatcher.receiveBasicCancelOk(channelId, consumerTag); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java Fri Oct 10 09:54:36 2014 @@ -191,7 +191,7 @@ public class BasicConsumeBody extends AM return buf.toString(); } - public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) + public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { @@ -205,6 +205,6 @@ public class BasicConsumeBody extends AM boolean exclusive = (bitfield & 0x04) == 0x04; boolean nowait = (bitfield & 0x08) == 0x08; FieldTable arguments = EncodingUtils.readFieldTable(buffer); - return dispatcher.basicConsume(channelId, queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments); + dispatcher.receiveBasicConsume(channelId, queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java Fri Oct 10 09:54:36 2014 @@ -96,10 +96,10 @@ public class BasicConsumeOkBody extends return buf.toString(); } - public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) + public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException { AMQShortString consumerTag = buffer.readAMQShortString(); - return dispatcher.basicConsumeOk(channelId, consumerTag); + dispatcher.receiveBasicConsumeOk(channelId, consumerTag); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java Fri Oct 10 09:54:36 2014 @@ -152,9 +152,9 @@ public class BasicDeliverBody extends AM return buf.toString(); } - public static <T> T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor<T> dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { AMQShortString consumerTag = buffer.readAMQShortString(); @@ -162,6 +162,6 @@ public class BasicDeliverBody extends AM boolean redelivered = (buffer.readByte() & 0x01) != 0; AMQShortString exchange = buffer.readAMQShortString(); AMQShortString routingKey = buffer.readAMQShortString(); - return dispatcher.basicDeliver(channelId, consumerTag, deliveryTag, redelivered, exchange, routingKey); + dispatcher.receiveBasicDeliver(channelId, consumerTag, deliveryTag, redelivered, exchange, routingKey); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java Fri Oct 10 09:54:36 2014 @@ -125,13 +125,13 @@ public class BasicGetBody extends AMQMet return buf.toString(); } - public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) + public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException { int ticket = buffer.readUnsignedShort(); AMQShortString queue = buffer.readAMQShortString(); boolean noAck = (buffer.readByte() & 0x01) != 0; - return dispatcher.basicGet(channelId, queue, noAck); + dispatcher.receiveBasicGet(channelId, queue, noAck); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java Fri Oct 10 09:54:36 2014 @@ -96,11 +96,11 @@ public class BasicGetEmptyBody extends A return buf.toString(); } - public static <T> T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor<T> dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { AMQShortString clusterId = buffer.readAMQShortString(); - return dispatcher.basicGetEmpty(channelId); + dispatcher.receiveBasicGetEmpty(channelId); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java Fri Oct 10 09:54:36 2014 @@ -151,15 +151,15 @@ public class BasicGetOkBody extends AMQM return buf.toString(); } - public static <T> T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor<T> dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { long deliveryTag = buffer.readLong(); boolean redelivered = (buffer.readByte() & 0x01) != 0; AMQShortString exchange = buffer.readAMQShortString(); AMQShortString routingKey = buffer.readAMQShortString(); long messageCount = EncodingUtils.readUnsignedInteger(buffer); - return dispatcher.basicGetOk(channelId, deliveryTag, redelivered, exchange, routingKey, messageCount); + dispatcher.receiveBasicGetOk(channelId, deliveryTag, redelivered, exchange, routingKey, messageCount); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java Fri Oct 10 09:54:36 2014 @@ -151,9 +151,9 @@ public class BasicPublishBody extends AM return buf.toString(); } - public static <T> T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor<T> dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { int ticket = buffer.readUnsignedShort(); @@ -163,6 +163,6 @@ public class BasicPublishBody extends AM boolean mandatory = (bitfield & 0x01) != 0; boolean immediate = (bitfield & 0x02) != 0; - return dispatcher.basicPublish(channelId, exchange, routingKey, mandatory, immediate); + dispatcher.receiveBasicPublish(channelId, exchange, routingKey, mandatory, immediate); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java Fri Oct 10 09:54:36 2014 @@ -124,14 +124,14 @@ public class BasicQosBody extends AMQMet return buf.toString(); } - public static <T> T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor<T> dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { long prefetchSize = EncodingUtils.readUnsignedInteger(buffer); int prefetchCount = buffer.readUnsignedShort(); boolean global = (buffer.readByte() & 0x01) == 0x01; - return dispatcher.basicQos(channelId, prefetchSize, prefetchCount, global); + dispatcher.receiveBasicQos(channelId, prefetchSize, prefetchCount, global); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java Fri Oct 10 09:54:36 2014 @@ -100,14 +100,14 @@ public class BasicRecoverBody extends AM return buf.toString(); } - public static <T> T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput in, final ProtocolVersion protocolVersion, - final MethodProcessor<T> dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { boolean requeue = (in.readByte() & 0x01) == 0x01; boolean sync = (ProtocolVersion.v8_0.equals(protocolVersion)); - return dispatcher.basicRecover(channelId, requeue, sync); + dispatcher.receiveBasicRecover(channelId, requeue, sync); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java Fri Oct 10 09:54:36 2014 @@ -103,11 +103,11 @@ public class BasicRecoverSyncBody extend return buf.toString(); } - public static <T> T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput in, - final MethodProcessor<T> dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { boolean requeue = (in.readByte() & 0x01) == 0x01; - return dispatcher.basicRecover(channelId, requeue, true); + dispatcher.receiveBasicRecover(channelId, requeue, true); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java Fri Oct 10 09:54:36 2014 @@ -112,13 +112,13 @@ public class BasicRejectBody extends AMQ return buf.toString(); } - public static <T> T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor<T> dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { long deliveryTag = buffer.readLong(); boolean requeue = (buffer.readByte() & 0x01) != 0; - return dispatcher.basicReject(channelId, deliveryTag, requeue); + dispatcher.receiveBasicReject(channelId, deliveryTag, requeue); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java Fri Oct 10 09:54:36 2014 @@ -134,15 +134,15 @@ public class BasicReturnBody extends AMQ return buf.toString(); } - public static <T> T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor<T> dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { int replyCode = buffer.readUnsignedShort(); AMQShortString replyText = buffer.readAMQShortString(); AMQShortString exchange = buffer.readAMQShortString(); AMQShortString routingKey = buffer.readAMQShortString(); - return dispatcher.basicReturn(channelId, replyCode, replyText, exchange, routingKey); + dispatcher.receiveBasicReturn(channelId, replyCode, replyText, exchange, routingKey); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java Fri Oct 10 09:54:36 2014 @@ -121,13 +121,13 @@ public class ChannelAlertBody extends AM return buf.toString(); } - public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) + public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { int replyCode = buffer.readUnsignedShort(); AMQShortString replyText = buffer.readAMQShortString(); FieldTable details = EncodingUtils.readFieldTable(buffer); - return dispatcher.channelAlert(channelId, replyCode, replyText, details); + dispatcher.receiveChannelAlert(channelId, replyCode, replyText, details); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java Fri Oct 10 09:54:36 2014 @@ -132,15 +132,15 @@ public class ChannelCloseBody extends AM return buf.toString(); } - public static <T> T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor<T> dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { int replyCode = buffer.readUnsignedShort(); AMQShortString replyText = buffer.readAMQShortString(); int classId = buffer.readUnsignedShort(); int methodId = buffer.readUnsignedShort(); - return dispatcher.channelClose(channelId, replyCode, replyText, classId, methodId); + dispatcher.receiveChannelClose(channelId, replyCode, replyText, classId, methodId); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java Fri Oct 10 09:54:36 2014 @@ -92,11 +92,11 @@ public class ChannelFlowBody extends AMQ return buf.toString(); } - public static <T> T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor<T> dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { boolean active = (buffer.readByte() & 0x01) == 0x01; - return dispatcher.channelFlow(channelId, active); + dispatcher.receiveChannelFlow(channelId, active); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java Fri Oct 10 09:54:36 2014 @@ -93,10 +93,10 @@ public class ChannelFlowOkBody extends A return buf.toString(); } - public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) + public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException { boolean active = (buffer.readByte() & 0x01) == 0x01; - return dispatcher.channelFlowOk(channelId, active); + dispatcher.receiveChannelFlowOk(channelId, active); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java Fri Oct 10 09:54:36 2014 @@ -82,11 +82,11 @@ public class ChannelOpenBody extends AMQ return "[ChannelOpenBody] "; } - public static <T> T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor<T> dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { buffer.readAMQShortString(); - return dispatcher.channelOpen(channelId); + dispatcher.receiveChannelOpen(channelId); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java Fri Oct 10 09:54:36 2014 @@ -96,16 +96,16 @@ public class ChannelOpenOkBody extends A return "[ChannelOpenOkBody]"; } - public static <T> T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput in, final ProtocolVersion protocolVersion, - final MethodProcessor<T> dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { if(!ProtocolVersion.v8_0.equals(protocolVersion)) { EncodingUtils.readBytes(in); } - return dispatcher.channelOpenOk(channelId); + dispatcher.receiveChannelOpenOk(channelId); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java Fri Oct 10 09:54:36 2014 @@ -134,12 +134,12 @@ public class ConnectionCloseBody extends return buf.toString(); } - public static <T> T process(final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException { int replyCode = buffer.readUnsignedShort(); AMQShortString replyText = buffer.readAMQShortString(); int classId = buffer.readUnsignedShort(); int methodId = buffer.readUnsignedShort(); - return dispatcher.connectionClose(replyCode, replyText, classId, methodId); + dispatcher.receiveConnectionClose(replyCode, replyText, classId, methodId); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java Fri Oct 10 09:54:36 2014 @@ -121,12 +121,12 @@ public class ConnectionOpenBody extends return buf.toString(); } - public static <T> T process(final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException { AMQShortString virtualHost = buffer.readAMQShortString(); AMQShortString capabilities = buffer.readAMQShortString(); boolean insist = (buffer.readByte() & 0x01) == 0x01; - return dispatcher.connectionOpen(virtualHost, capabilities, insist); + dispatcher.receiveConnectionOpen(virtualHost, capabilities, insist); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java Fri Oct 10 09:54:36 2014 @@ -96,10 +96,10 @@ public class ConnectionOpenOkBody extend return buf.toString(); } - public static <T> T process(final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException { AMQShortString knownHosts = buffer.readAMQShortString(); - return dispatcher.connectionOpenOk(knownHosts); + dispatcher.receiveConnectionOpenOk(knownHosts); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java Fri Oct 10 09:54:36 2014 @@ -108,10 +108,10 @@ public class ConnectionRedirectBody exte return buf.toString(); } - public static <T> T process(final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException { AMQShortString host = buffer.readAMQShortString(); AMQShortString knownHosts = buffer.readAMQShortString(); - return dispatcher.connectionRedirect(host, knownHosts); + dispatcher.receiveConnectionRedirect(host, knownHosts); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java Fri Oct 10 09:54:36 2014 @@ -96,11 +96,11 @@ public class ConnectionSecureBody extend return buf.toString(); } - public static <T> T process(final MarkableDataInput in, final MethodProcessor<T> dispatcher) + public static void process(final MarkableDataInput in, final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { byte[] challenge = EncodingUtils.readBytes(in); - return dispatcher.connectionSecure(challenge); + dispatcher.receiveConnectionSecure(challenge); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java Fri Oct 10 09:54:36 2014 @@ -96,9 +96,9 @@ public class ConnectionSecureOkBody exte return buf.toString(); } - public static <T> T process(final MarkableDataInput in, final MethodProcessor<T> dispatcher) throws IOException + public static void process(final MarkableDataInput in, final MethodProcessor dispatcher) throws IOException { byte[] response = EncodingUtils.readBytes(in); - return dispatcher.connectionSecureOk(response); + dispatcher.receiveConnectionSecureOk(response); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java Fri Oct 10 09:54:36 2014 @@ -136,7 +136,7 @@ public class ConnectionStartBody extends return buf.toString(); } - public static <T> T process(final MarkableDataInput in, final MethodProcessor<T> dispatcher) + public static void process(final MarkableDataInput in, final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { short versionMajor = (short) in.readUnsignedByte(); @@ -146,6 +146,6 @@ public class ConnectionStartBody extends byte[] locales = EncodingUtils.readBytes(in); - return dispatcher.connectionStart(versionMajor, versionMinor, serverProperties, mechanisms, locales); + dispatcher.receiveConnectionStart(versionMajor, versionMinor, serverProperties, mechanisms, locales); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java Fri Oct 10 09:54:36 2014 @@ -126,7 +126,7 @@ public class ConnectionStartOkBody exten return buf.toString(); } - public static <T> T process(final MarkableDataInput in, final MethodProcessor<T> dispatcher) + public static void process(final MarkableDataInput in, final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { @@ -135,6 +135,6 @@ public class ConnectionStartOkBody exten byte[] response = EncodingUtils.readBytes(in); AMQShortString locale = in.readAMQShortString(); - return dispatcher.connectionStartOk(clientProperties, mechanism, response, locale); + dispatcher.receiveConnectionStartOk(clientProperties, mechanism, response, locale); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java Fri Oct 10 09:54:36 2014 @@ -119,12 +119,12 @@ public class ConnectionTuneBody extends return buf.toString(); } - public static <T> T process(final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException { int channelMax = buffer.readUnsignedShort(); long frameMax = EncodingUtils.readUnsignedInteger(buffer); int heartbeat = buffer.readUnsignedShort(); - return dispatcher.connectionTune(channelMax, frameMax, heartbeat); + dispatcher.receiveConnectionTune(channelMax, frameMax, heartbeat); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org