QPID-8038: [Broker-J][AMQP 0-10] Add protocol tests for AMQP 0-10
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/ff2980e2 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/ff2980e2 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/ff2980e2 Branch: refs/heads/master Commit: ff2980e2d6e9520ba204acd41a78e9ee412a2c11 Parents: 612c2cb Author: Alex Rudyy <oru...@apache.org> Authored: Tue Nov 21 17:16:42 2017 +0000 Committer: Alex Rudyy <oru...@apache.org> Committed: Wed Nov 22 15:02:23 2017 +0000 ---------------------------------------------------------------------- pom.xml | 47 ++-- systests/protocol-tests-amqp-0-10/pom.xml | 109 +++++++ .../qpid/tests/protocol/v0_10/Assembler.java | 264 +++++++++++++++++ .../protocol/v0_10/ConnectionInteraction.java | 83 ++++++ .../qpid/tests/protocol/v0_10/Disassembler.java | 281 +++++++++++++++++++ .../tests/protocol/v0_10/ErrorResponse.java | 40 +++ .../protocol/v0_10/ExecutionInteraction.java | 47 ++++ .../qpid/tests/protocol/v0_10/FrameDecoder.java | 190 +++++++++++++ .../qpid/tests/protocol/v0_10/FrameEncoder.java | 87 ++++++ .../tests/protocol/v0_10/FrameTransport.java | 58 ++++ .../qpid/tests/protocol/v0_10/Interaction.java | 138 +++++++++ .../protocol/v0_10/MessageInteraction.java | 147 ++++++++++ .../protocol/v0_10/PerformativeResponse.java | 48 ++++ .../protocol/v0_10/ProtocolEventReceiver.java | 67 +++++ .../protocol/v0_10/SessionInteraction.java | 89 ++++++ .../tests/protocol/v0_10/TxInteraction.java | 60 ++++ .../resources/config-protocol-tests-0-10.json | 78 +++++ .../tests/protocol/v0_10/ConnectionTest.java | 214 ++++++++++++++ .../qpid/tests/protocol/v0_10/MessageTest.java | 260 +++++++++++++++++ .../qpid/tests/protocol/v0_10/SessionTest.java | 123 ++++++++ .../tests/protocol/v0_10/TransactionTest.java | 92 ++++++ .../protocol/v0_8/ExchangeInteraction.java | 58 ++++ .../tests/protocol/v0_8/FrameTransport.java | 12 +- .../qpid/tests/protocol/v0_8/Interaction.java | 23 +- .../qpid/tests/protocol/v0_8/TxInteraction.java | 44 +++ .../qpid/tests/protocol/v0_8/ChannelTest.java | 2 +- .../tests/protocol/v0_8/ConnectionTest.java | 26 +- .../tests/protocol/v0_8/TransactionTest.java | 79 ++++++ .../tests/protocol/v1_0/FrameTransport.java | 4 +- .../qpid/tests/protocol/v1_0/Interaction.java | 9 +- .../tests/protocol/AbstractFrameTransport.java | 175 ++++++++++++ .../tests/protocol/AbstractInteraction.java | 150 ++++++++++ .../tests/protocol/ChannelClosedResponse.java | 36 +++ .../qpid/tests/protocol/FrameTransport.java | 190 ------------- .../apache/qpid/tests/protocol/Interaction.java | 147 ---------- 35 files changed, 3097 insertions(+), 380 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index e7d58bb..0c0c445 100644 --- a/pom.xml +++ b/pom.xml @@ -198,6 +198,7 @@ <module>systests/qpid-systests-jms_2.0</module> <module>systests/protocol-tests-core</module> <module>systests/protocol-tests-amqp-0-8</module> + <module>systests/protocol-tests-amqp-0-10</module> <module>systests/protocol-tests-amqp-1-0</module> <module>systests/end-to-end-conversion-tests</module> <module>perftests</module> @@ -427,6 +428,12 @@ <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>protocol-tests-amqp-0-10</artifactId> + <version>${project.version}</version> + </dependency> + <!-- External dependencies --> <dependency> <groupId>org.apache.qpid</groupId> @@ -1332,26 +1339,26 @@ </properties> </profile> - <profile> - <id>java-json.0-10</id> - <activation> - <property> - <name>profile</name> - <value>java-json.0-10</value> - </property> - </activation> - <properties> - <profile>java-json.0-10</profile> - <profile.specific.excludes>JavaPersistentExcludes JavaJsonExcludes XAExcludes Java010Excludes</profile.specific.excludes> - <profile.broker.version>v0_10</profile.broker.version> - <profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1","AMQP_0_10"]</profile.test.amqp_port_protocols> - <profile.broker.persistent>true</profile.broker.persistent> - <profile.virtualhostnode.type>JSON</profile.virtualhostnode.type> - <profile.virtualhostnode.context.blueprint>{"type":"BDB","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint> - </properties> - </profile> - - <profile> + <profile> + <id>java-json.0-10</id> + <activation> + <property> + <name>profile</name> + <value>java-json.0-10</value> + </property> + </activation> + <properties> + <profile>java-json.0-10</profile> + <profile.specific.excludes>JavaPersistentExcludes JavaJsonExcludes XAExcludes Java010Excludes</profile.specific.excludes> + <profile.broker.version>v0_10</profile.broker.version> + <profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1","AMQP_0_10"]</profile.test.amqp_port_protocols> + <profile.broker.persistent>true</profile.broker.persistent> + <profile.virtualhostnode.type>JSON</profile.virtualhostnode.type> + <profile.virtualhostnode.context.blueprint>{"type":"BDB","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint> + </properties> + </profile> + + <profile> <id>cpp</id> <activation> <property> http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/pom.xml ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-10/pom.xml b/systests/protocol-tests-amqp-0-10/pom.xml new file mode 100644 index 0000000..3acf129 --- /dev/null +++ b/systests/protocol-tests-amqp-0-10/pom.xml @@ -0,0 +1,109 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-systests-parent</artifactId> + <version>7.1.0-SNAPSHOT</version> + <relativePath>../../qpid-systests-parent/pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>protocol-tests-amqp-0-10</artifactId> + <name>Apache Qpid Protocol Tests for AMQP 0-10</name> + <description>Tests for AMQP 0-10</description> + + <dependencies> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-broker-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-broker-plugins-amqp-0-10-protocol</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-test-utils</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>protocol-tests-core</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-systests-utils</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-broker-plugins-logging-logback</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-broker-plugins-memory-store</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-broker-plugins-derby-store</artifactId> + <optional>true</optional> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-bdbstore</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-library</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-integration</artifactId> + <scope>test</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <systemPropertyVariables> + <qpid.initialConfigurationLocation>classpath:config-protocol-tests-0-10.json</qpid.initialConfigurationLocation> + </systemPropertyVariables> + </configuration> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Assembler.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Assembler.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Assembler.java new file mode 100644 index 0000000..aac39b6 --- /dev/null +++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Assembler.java @@ -0,0 +1,264 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_10; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.protocol.v0_10.transport.BBDecoder; +import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties; +import org.apache.qpid.server.protocol.v0_10.transport.Frame; +import org.apache.qpid.server.protocol.v0_10.transport.Header; +import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties; +import org.apache.qpid.server.protocol.v0_10.transport.Method; +import org.apache.qpid.server.protocol.v0_10.transport.NetworkDelegate; +import org.apache.qpid.server.protocol.v0_10.transport.NetworkEvent; +import org.apache.qpid.server.protocol.v0_10.transport.ProtocolError; +import org.apache.qpid.server.protocol.v0_10.transport.ProtocolEvent; +import org.apache.qpid.server.protocol.v0_10.transport.ProtocolHeader; +import org.apache.qpid.server.protocol.v0_10.transport.Struct; + +public class Assembler implements NetworkDelegate +{ + + private static final int ARRAY_SIZE = 0xFF; + private final Method[] _incompleteMethodArray = new Method[ARRAY_SIZE + 1]; + private final Map<Integer, Method> _incompleteMethodMap = new HashMap<>(); + + private final ProtocolEventReceiver receiver; + private final Map<Integer, List<Frame>> segments; + private static final ThreadLocal<BBDecoder> _decoder = ThreadLocal.withInitial(BBDecoder::new); + + Assembler(ProtocolEventReceiver receiver) + { + this.receiver = receiver; + segments = new HashMap<>(); + } + + private int segmentKey(Frame frame) + { + return (frame.getTrack() + 1) * frame.getChannel(); + } + + private List<Frame> getSegment(Frame frame) + { + return segments.get(segmentKey(frame)); + } + + private void setSegment(Frame frame, List<Frame> segment) + { + int key = segmentKey(frame); + if (segments.containsKey(key)) + { + error(new ProtocolError(Frame.L2, "segment in progress: %s", + frame)); + } + segments.put(segmentKey(frame), segment); + } + + private void clearSegment(Frame frame) + { + segments.remove(segmentKey(frame)); + } + + private void emit(int channel, ProtocolEvent event) + { + event.setChannel(channel); + receiver.received(event); + } + + void received(NetworkEvent event) + { + event.delegate(this); + } + + public void init(ProtocolHeader header) + { + emit(0, header); + } + + public void error(ProtocolError error) + { + emit(0, error); + } + + public void frame(Frame frame) + { + ByteBuffer segment; + if (frame.isFirstFrame() && frame.isLastFrame()) + { + segment = frame.getBody(); + assemble(frame, segment); + } + else + { + List<Frame> frames; + if (frame.isFirstFrame()) + { + frames = new ArrayList<>(); + setSegment(frame, frames); + } + else + { + frames = getSegment(frame); + } + + frames.add(frame); + + if (frame.isLastFrame()) + { + clearSegment(frame); + + int size = 0; + for (Frame f : frames) + { + size += f.getSize(); + } + segment = allocateByteBuffer(size); + for (Frame f : frames) + { + segment.put(f.getBody()); + } + segment.flip(); + assemble(frame, segment); + } + } + } + + private ByteBuffer allocateByteBuffer(final int size) + { + return ByteBuffer.allocate(size); + } + + private void assemble(Frame frame, ByteBuffer segment) + { + BBDecoder dec = _decoder.get(); + dec.init(segment); + + int channel = frame.getChannel(); + Method command; + + switch (frame.getType()) + { + case CONTROL: + int controlType = dec.readUint16(); + Method control = Method.create(controlType); + control.read(dec); + emit(channel, control); + break; + case COMMAND: + int commandType = dec.readUint16(); + // read in the session header, right now we don't use it + int hdr = dec.readUint16(); + command = Method.create(commandType); + command.setSync((0x0001 & hdr) != 0); + command.read(dec); + if (command.hasPayload() && !frame.isLastSegment()) + { + setIncompleteCommand(channel, command); + } + else + { + emit(channel, command); + } + break; + case HEADER: + command = getIncompleteCommand(channel); + List<Struct> structs = null; + DeliveryProperties deliveryProps = null; + MessageProperties messageProps = null; + + while (dec.hasRemaining()) + { + Struct struct = dec.readStruct32(); + if (struct instanceof DeliveryProperties && deliveryProps == null) + { + deliveryProps = (DeliveryProperties) struct; + } + else if (struct instanceof MessageProperties && messageProps == null) + { + messageProps = (MessageProperties) struct; + } + else + { + if (structs == null) + { + structs = new ArrayList<>(2); + } + structs.add(struct); + } + } + command.setHeader(new Header(deliveryProps, messageProps, structs)); + + if (frame.isLastSegment()) + { + setIncompleteCommand(channel, null); + emit(channel, command); + } + break; + case BODY: + command = getIncompleteCommand(channel); + command.setBody(QpidByteBuffer.wrap(segment)); + setIncompleteCommand(channel, null); + emit(channel, command); + break; + default: + throw new IllegalStateException("unknown frame type: " + frame.getType()); + } + + dec.releaseBuffer(); + } + + private void setIncompleteCommand(int channelId, Method incomplete) + { + if ((channelId & ARRAY_SIZE) == channelId) + { + _incompleteMethodArray[channelId] = incomplete; + } + else + { + if (incomplete != null) + { + _incompleteMethodMap.put(channelId, incomplete); + } + else + { + _incompleteMethodMap.remove(channelId); + } + } + } + + private Method getIncompleteCommand(int channelId) + { + if ((channelId & ARRAY_SIZE) == channelId) + { + return _incompleteMethodArray[channelId]; + } + else + { + return _incompleteMethodMap.get(channelId); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java new file mode 100644 index 0000000..d7b54b0 --- /dev/null +++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java @@ -0,0 +1,83 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_10; + +import org.apache.qpid.server.protocol.v0_10.transport.ConnectionOpen; +import org.apache.qpid.server.protocol.v0_10.transport.ConnectionStartOk; +import org.apache.qpid.server.protocol.v0_10.transport.ConnectionTuneOk; + +public class ConnectionInteraction +{ + public static final String SASL_MECHANISM_ANONYMOUS = "ANONYMOUS"; + public static final String SASL_MECHANISM_PLAIN = "PLAIN"; + + private final Interaction _interaction; + private ConnectionStartOk _startOk; + private ConnectionTuneOk _tuneOk; + private ConnectionOpen _open; + + public ConnectionInteraction(final Interaction interaction) + { + _interaction = interaction; + _startOk = new ConnectionStartOk(); + _tuneOk = new ConnectionTuneOk(); + _open = new ConnectionOpen(); + } + + public Interaction startOk() throws Exception + { + return _interaction.sendPerformative(_startOk); + } + + public ConnectionInteraction startOkMechanism(final String mechanism) + { + _startOk.setMechanism(mechanism); + return this; + } + + public Interaction tuneOk() throws Exception + { + return _interaction.sendPerformative(_tuneOk); + } + + public Interaction open() throws Exception + { + return _interaction.sendPerformative(_open); + } + + public ConnectionInteraction tuneOkChannelMax(final int channelMax) + { + _tuneOk.setChannelMax(channelMax); + return this; + } + + public ConnectionInteraction tuneOkMaxFrameSize(final int maxFrameSize) + { + _tuneOk.setMaxFrameSize(maxFrameSize); + return this; + } + + public ConnectionInteraction startOkResponse(final byte[] response) + { + _startOk.setResponse(response); + return this; + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Disassembler.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Disassembler.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Disassembler.java new file mode 100644 index 0000000..e60049e --- /dev/null +++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Disassembler.java @@ -0,0 +1,281 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_10; + +import static java.lang.Math.min; +import static org.apache.qpid.server.protocol.v0_10.transport.Frame.FIRST_FRAME; +import static org.apache.qpid.server.protocol.v0_10.transport.Frame.FIRST_SEG; +import static org.apache.qpid.server.protocol.v0_10.transport.Frame.HEADER_SIZE; +import static org.apache.qpid.server.protocol.v0_10.transport.Frame.LAST_FRAME; +import static org.apache.qpid.server.protocol.v0_10.transport.Frame.LAST_SEG; + +import java.nio.ByteBuffer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.protocol.v0_10.FrameSizeObserver; +import org.apache.qpid.server.protocol.v0_10.ProtocolEventSender; +import org.apache.qpid.server.protocol.v0_10.transport.BBEncoder; +import org.apache.qpid.server.protocol.v0_10.transport.Frame; +import org.apache.qpid.server.protocol.v0_10.transport.Header; +import org.apache.qpid.server.protocol.v0_10.transport.Method; +import org.apache.qpid.server.protocol.v0_10.transport.ProtocolDelegate; +import org.apache.qpid.server.protocol.v0_10.transport.ProtocolError; +import org.apache.qpid.server.protocol.v0_10.transport.ProtocolEvent; +import org.apache.qpid.server.protocol.v0_10.transport.ProtocolHeader; +import org.apache.qpid.server.protocol.v0_10.transport.SegmentType; +import org.apache.qpid.server.protocol.v0_10.transport.Struct; +import org.apache.qpid.server.transport.ByteBufferSender; + +/** + * Disassembler + */ +public final class Disassembler implements ProtocolEventSender, ProtocolDelegate<Void>, FrameSizeObserver +{ + private static final Logger LOGGER = LoggerFactory.getLogger(Disassembler.class); + private final ByteBufferSender _sender; + private final Object _sendlock = new Object(); + private volatile int _maxPayload; + private final static ThreadLocal<BBEncoder> _encoder = new ThreadLocal<BBEncoder>() + { + public BBEncoder initialValue() + { + return new BBEncoder(4 * 1024); + } + }; + + public Disassembler(ByteBufferSender sender, int maxFrame) + { + _sender = sender; + if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024) + { + throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame); + } + _maxPayload = maxFrame - HEADER_SIZE; + } + + public void send(ProtocolEvent event) + { + event.delegate(null, this); + } + + public void flush() + { + synchronized (_sendlock) + { + _sender.flush(); + } + } + + public void close() + { + synchronized (_sendlock) + { + _sender.close(); + } + } + + public void init(Void v, ProtocolHeader header) + { + synchronized (_sendlock) + { + _sender.send(header.toByteBuffer()); + _sender.flush(); + } + } + + public void control(Void v, Method method) + { + method(method, SegmentType.CONTROL); + } + + public void command(Void v, Method method) + { + method(method, SegmentType.COMMAND); + } + + private void method(Method method, SegmentType type) + { + BBEncoder enc = _encoder.get(); + enc.init(); + enc.writeUint16(method.getEncodedType()); + if (type == SegmentType.COMMAND) + { + if (method.isSync()) + { + enc.writeUint16(0x0101); + } + else + { + enc.writeUint16(0x0100); + } + } + method.write(enc); + int methodLimit = enc.position(); + + byte flags = FIRST_SEG; + + boolean payload = method.hasPayload(); + if (!payload) + { + flags |= LAST_SEG; + } + + int headerLimit = -1; + if (payload) + { + final Header hdr = method.getHeader(); + if (hdr != null) + { + if(hdr.getDeliveryProperties() != null) + { + enc.writeStruct32(hdr.getDeliveryProperties()); + } + if(hdr.getMessageProperties() != null) + { + enc.writeStruct32(hdr.getMessageProperties()); + } + if(hdr.getNonStandardProperties() != null) + { + for (Struct st : hdr.getNonStandardProperties()) + { + enc.writeStruct32(st); + } + } + } + headerLimit = enc.position(); + } + + synchronized (_sendlock) + { + ByteBuffer buf = enc.underlyingBuffer(); + buf.flip(); + ByteBuffer copy = ByteBuffer.allocate(buf.remaining()); + copy.put(buf.duplicate()); + copy.flip(); + + final ByteBuffer methodBuf = view(copy,0, methodLimit); + fragment(flags, type, method, methodBuf); + if (payload) + { + QpidByteBuffer qpidByteBuffer = method.getBody(); + ByteBuffer body = null; + if (qpidByteBuffer != null) + { + body = ByteBuffer.allocate(qpidByteBuffer.remaining()); + qpidByteBuffer.copyTo(body); + } + ByteBuffer headerBuf = view(copy, methodLimit, headerLimit); + fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, headerBuf); + if (body != null) + { + fragment(LAST_SEG, SegmentType.BODY, method, body.duplicate()); + } + } + } + } + + private void fragment(byte flags, SegmentType type, ProtocolEvent event, ByteBuffer buffer) + { + byte typeb = (byte) type.getValue(); + byte track = event.getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0; + + int remaining = buffer.remaining(); + boolean first = true; + while (true) + { + int size = min(_maxPayload, remaining); + remaining -= size; + + byte newflags = flags; + if (first) + { + newflags |= FIRST_FRAME; + first = false; + } + if (remaining == 0) + { + newflags |= LAST_FRAME; + } + + frame(newflags, typeb, track, event.getChannel(), size, buffer); + + if (remaining == 0) + { + break; + } + } + } + + private void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buffer) + { + ByteBuffer data = ByteBuffer.allocate(HEADER_SIZE); + + data.put(0, flags); + data.put(1, type); + data.putShort(2, (short) (size + HEADER_SIZE)); + data.put(4, (byte) 0); + data.put(5, track); + data.putShort(6, (short) channel); + + try (QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(data)) + { + _sender.send(qpidByteBuffer); + } + + if(size > 0) + { + final ByteBuffer view = view(buffer, 0, size); + try (QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(view)) + { + _sender.send(qpidByteBuffer); + } + buffer.position(buffer.position() + size); + } + } + + public void error(Void v, ProtocolError error) + { + throw new IllegalArgumentException(String.valueOf(error)); + } + + @Override + public void setMaxFrameSize(final int maxFrame) + { + if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024) + { + throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame); + } + _maxPayload = maxFrame - HEADER_SIZE; + + } + + private static ByteBuffer view(ByteBuffer buffer, int offset, int length) + { + ByteBuffer view = buffer.slice(); + view.position(offset); + int newLimit = Math.min(view.position() + length, view.capacity()); + view.limit(newLimit); + return view.slice(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ErrorResponse.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ErrorResponse.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ErrorResponse.java new file mode 100644 index 0000000..fa79489 --- /dev/null +++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ErrorResponse.java @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_10; + +import org.apache.qpid.server.protocol.v0_10.transport.ProtocolError; +import org.apache.qpid.tests.protocol.Response; + +public class ErrorResponse implements Response<ProtocolError> +{ + private final ProtocolError _error; + + public ErrorResponse(final ProtocolError protocolError) + { + _error = protocolError; + } + + @Override + public ProtocolError getBody() + { + return _error; + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ExecutionInteraction.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ExecutionInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ExecutionInteraction.java new file mode 100644 index 0000000..2e6817b --- /dev/null +++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ExecutionInteraction.java @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_10; + +import org.apache.qpid.server.protocol.v0_10.transport.ExecutionSync; + +public class ExecutionInteraction +{ + private final Interaction _interaction; + private final ExecutionSync _sync; + + public ExecutionInteraction(final Interaction interaction) + { + _interaction = interaction; + _sync = new ExecutionSync(); + } + + public ExecutionInteraction syncId(final int id) + { + _sync.setId(id); + return this; + } + + public Interaction sync() throws Exception + { + _interaction.sendPerformative(_sync); + return _interaction; + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameDecoder.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameDecoder.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameDecoder.java new file mode 100644 index 0000000..fff894b --- /dev/null +++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameDecoder.java @@ -0,0 +1,190 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_10; + +import static org.apache.qpid.server.transport.util.Functions.str; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Collection; + +import org.apache.qpid.server.protocol.v0_10.transport.Frame; +import org.apache.qpid.server.protocol.v0_10.transport.ProtocolError; +import org.apache.qpid.server.protocol.v0_10.transport.ProtocolHeader; +import org.apache.qpid.server.protocol.v0_10.transport.SegmentType; +import org.apache.qpid.tests.protocol.InputDecoder; +import org.apache.qpid.tests.protocol.Response; + +public class FrameDecoder implements InputDecoder +{ + + private final ProtocolEventReceiver _receiver; + + public enum State + { + PROTO_HDR, + FRAME_HDR, + FRAME_BODY, + ERROR + } + + private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0); + + private final Assembler _assembler; + + private int _maxFrameSize = 4096; + private State _state; + private ByteBuffer input = null; + private int _needed; + + private byte _flags; + private SegmentType _type; + private byte _track; + private int _channel; + + FrameDecoder(final byte[] headerBytes) + { + _receiver = new ProtocolEventReceiver(headerBytes); + this._assembler = new Assembler(_receiver); + this._state = State.PROTO_HDR; + _needed = 8; + + } + + @Override + public Collection<Response<?>> decode(final ByteBuffer buf) throws Exception + { + int limit = buf.limit(); + int remaining = buf.remaining(); + while (remaining > 0) + { + if (remaining >= _needed) + { + int consumed = _needed; + int pos = buf.position(); + if (input == null) + { + buf.limit(pos + _needed); + input = buf; + _state = next(pos); + buf.limit(limit); + buf.position(pos + consumed); + } + else + { + buf.limit(pos + _needed); + input.put(buf); + buf.limit(limit); + input.flip(); + _state = next(0); + } + + remaining -= consumed; + input = null; + } + else + { + if (input == null) + { + input = ByteBuffer.allocate(_needed); + } + input.put(buf); + _needed -= remaining; + remaining = 0; + } + } + return _receiver.getReceivedEvents(); + } + + private State next(int pos) + { + input.order(ByteOrder.BIG_ENDIAN); + + switch (_state) { + case PROTO_HDR: + if (input.get(pos) != 'A' && + input.get(pos + 1) != 'M' && + input.get(pos + 2) != 'Q' && + input.get(pos + 3) != 'P') + { + error("bad protocol header: %s", str(input)); + return State.ERROR; + } + + byte protoClass = input.get(pos + 4); + byte instance = input.get(pos + 5); + byte major = input.get(pos + 6); + byte minor = input.get(pos + 7); + _assembler.received(new ProtocolHeader(protoClass, instance, major, minor)); + _needed = Frame.HEADER_SIZE; + return State.FRAME_HDR; + case FRAME_HDR: + _flags = input.get(pos); + _type = SegmentType.get(input.get(pos + 1)); + int size = (0xFFFF & input.getShort(pos + 2)); + size -= Frame.HEADER_SIZE; + _maxFrameSize = 64 * 1024; + if (size < 0 || size > (_maxFrameSize - 12)) + { + error("bad frame size: %d", size); + return State.ERROR; + } + byte b = input.get(pos + 5); + if ((b & 0xF0) != 0) { + error("non-zero reserved bits in upper nibble of " + + "frame header byte 5: '%x'", b); + return State.ERROR; + } else { + _track = (byte) (b & 0xF); + } + _channel = (0xFFFF & input.getShort(pos + 6)); + if (size == 0) + { + Frame frame = new Frame(_flags, _type, _track, _channel, EMPTY_BYTE_BUFFER); + _assembler.received(frame); + _needed = Frame.HEADER_SIZE; + return State.FRAME_HDR; + } + else + { + _needed = size; + return State.FRAME_BODY; + } + case FRAME_BODY: + Frame frame = new Frame(_flags, _type, _track, _channel, input.slice()); + _assembler.received(frame); + _needed = Frame.HEADER_SIZE; + return State.FRAME_HDR; + default: + throw new IllegalStateException(); + } + } + + private void error(String fmt, Object ... args) + { + _assembler.received(new ProtocolError(Frame.L1, fmt, args)); + } + + public void setMaxFrameSize(final int maxFrameSize) + { + _maxFrameSize = maxFrameSize; + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameEncoder.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameEncoder.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameEncoder.java new file mode 100644 index 0000000..dfec4f4 --- /dev/null +++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameEncoder.java @@ -0,0 +1,87 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_10; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.protocol.v0_10.transport.BBEncoder; +import org.apache.qpid.server.protocol.v0_10.transport.Method; +import org.apache.qpid.server.protocol.v0_10.transport.ProtocolEvent; +import org.apache.qpid.server.transport.ByteBufferSender; +import org.apache.qpid.tests.protocol.OutputEncoder; + +public class FrameEncoder implements OutputEncoder +{ + @Override + public ByteBuffer encode(final Object msg) + { + if (msg instanceof ProtocolEvent) + { + final List<ByteBuffer> buffers = new ArrayList<>(); + final AtomicInteger totalSize = new AtomicInteger(); + Disassembler disassembler = new Disassembler(new ByteBufferSender() + { + @Override + public boolean isDirectBufferPreferred() + { + return false; + } + + @Override + public void send(final QpidByteBuffer msg) + { + int remaining = msg.remaining(); + byte[] data = new byte[remaining]; + ByteBuffer byteBuffer = ByteBuffer.wrap(data); + msg.get(data); + buffers.add(byteBuffer); + totalSize.addAndGet(remaining); + } + + @Override + public void flush() + { + + } + + @Override + public void close() + { + + } + }, 512); + + disassembler.send((ProtocolEvent) msg); + ByteBuffer data = ByteBuffer.allocate(totalSize.get()); + for (ByteBuffer buffer : buffers) + { + data.put(buffer); + } + data.flip(); + return data; + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java new file mode 100644 index 0000000..3b7849c --- /dev/null +++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_10; + +import java.net.InetSocketAddress; + +import org.apache.qpid.server.protocol.v0_10.ProtocolEngineCreator_0_10; +import org.apache.qpid.tests.protocol.AbstractFrameTransport; + + +public class FrameTransport extends AbstractFrameTransport<Interaction> +{ + private final byte[] _protocolHeader; + + public FrameTransport(final InetSocketAddress brokerAddress) + { + super(brokerAddress, new FrameDecoder(new ProtocolEngineCreator_0_10().getHeaderIdentifier()), new FrameEncoder()); + _protocolHeader = new ProtocolEngineCreator_0_10().getHeaderIdentifier(); + } + + @Override + public byte[] getProtocolHeader() + { + return _protocolHeader; + } + + @Override + public Interaction newInteraction() + { + return new Interaction(this); + } + + @Override + public FrameTransport connect() + { + super.connect(); + return this; + } + +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java new file mode 100644 index 0000000..5d53a89 --- /dev/null +++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java @@ -0,0 +1,138 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_10; + +import java.nio.ByteBuffer; + +import org.apache.qpid.server.protocol.v0_10.transport.BBDecoder; +import org.apache.qpid.server.protocol.v0_10.transport.BBEncoder; +import org.apache.qpid.server.protocol.v0_10.transport.ConnectionOpenOk; +import org.apache.qpid.server.protocol.v0_10.transport.ConnectionStart; +import org.apache.qpid.server.protocol.v0_10.transport.ConnectionTune; +import org.apache.qpid.server.protocol.v0_10.transport.Method; +import org.apache.qpid.server.protocol.v0_10.transport.SessionAttached; +import org.apache.qpid.tests.protocol.AbstractFrameTransport; +import org.apache.qpid.tests.protocol.AbstractInteraction; + +public class Interaction extends AbstractInteraction<Interaction> +{ + private ConnectionInteraction _connectionInteraction; + private SessionInteraction _sessionInteraction; + private MessageInteraction _messageInteraction; + private ExecutionInteraction _executionInteraction; + private int _channelId; + private TxInteraction _txInteraction; + + public Interaction(final AbstractFrameTransport frameTransport) + { + super(frameTransport); + _connectionInteraction = new ConnectionInteraction(this); + _sessionInteraction = new SessionInteraction(this); + _messageInteraction = new MessageInteraction(this); + _executionInteraction = new ExecutionInteraction(this); + _txInteraction = new TxInteraction(this); + } + + @Override + protected byte[] getProtocolHeader() + { + return getTransport().getProtocolHeader(); + } + + public <T extends Method> Interaction sendPerformative(final T performative) throws Exception + { + performative.setChannel(_channelId); + sendPerformativeAndChainFuture(copyPerformative(performative)); + return this; + } + + public ConnectionInteraction connection() + { + return _connectionInteraction; + } + + private <T extends Method> T copyPerformative(final T src) + { + T dst = (T) Method.create(src.getStructType()); + final BBEncoder encoder = new BBEncoder(4096); + encoder.init(); + src.write(encoder); + ByteBuffer buffer = encoder.buffer(); + + final BBDecoder decoder = new BBDecoder(); + decoder.init(buffer); + dst.read(decoder); + return dst; + } + + public Interaction openAnonymousConnection() throws Exception + { + this.negotiateProtocol().consumeResponse() + .consumeResponse(ConnectionStart.class) + .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk() + .consumeResponse(ConnectionTune.class) + .connection().tuneOk() + .connection().open() + .consumeResponse(ConnectionOpenOk.class); + return this; + } + + public SessionInteraction session() + { + return _sessionInteraction; + } + + public int getChannelId() + { + return _channelId; + } + + public Interaction channelId(final int channelId) + { + _channelId = channelId; + return this; + } + + public Interaction attachSession(final byte[] sessionName) throws Exception + { + this.session() + .attachName(sessionName) + .attach() + .consumeResponse(SessionAttached.class) + .session().commandPointCommandId(0).commandPoint(); + return this; + } + + public MessageInteraction message() + { + return _messageInteraction; + } + + public ExecutionInteraction execution() + { + return _executionInteraction; + } + + public TxInteraction tx() + { + return _txInteraction; + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java new file mode 100644 index 0000000..4660c86 --- /dev/null +++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java @@ -0,0 +1,147 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_10; + +import org.apache.qpid.server.protocol.v0_10.transport.MessageAccept; +import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode; +import org.apache.qpid.server.protocol.v0_10.transport.MessageAcquireMode; +import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit; +import org.apache.qpid.server.protocol.v0_10.transport.MessageFlow; +import org.apache.qpid.server.protocol.v0_10.transport.MessageSubscribe; +import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer; +import org.apache.qpid.server.protocol.v0_10.transport.RangeSet; + +public class MessageInteraction +{ + private final Interaction _interaction; + private MessageTransfer _transfer; + private MessageSubscribe _subscribe; + private MessageFlow _flow; + private MessageAccept _accept; + + public MessageInteraction(final Interaction interaction) + { + _interaction = interaction; + _transfer = new MessageTransfer(); + _subscribe = new MessageSubscribe(); + _flow = new MessageFlow(); + _accept = new MessageAccept(); + } + + public MessageInteraction transferId(final int id) + { + _transfer.setId(id); + return this; + } + + public MessageInteraction transferDesitnation(final String destination) + { + _transfer.setDestination(destination); + return this; + } + + public Interaction transfer() throws Exception + { + _interaction.sendPerformative(_transfer); + return _interaction; + } + + public MessageInteraction subscribeQueue(final String queueName) + { + _subscribe.setQueue(queueName); + return this; + } + + public MessageInteraction subscribeId(final int id) + { + _subscribe.setId(id); + return this; + } + + public Interaction subscribe() throws Exception + { + return _interaction.sendPerformative(_subscribe); + } + + public MessageInteraction subscribeDestination(final String destination) + { + _subscribe.setDestination(destination); + return this; + } + + public Interaction flow() throws Exception + { + return _interaction.sendPerformative(_flow); + } + + public MessageInteraction flowId(final int id) + { + _flow.setId(id); + return this; + } + + public MessageInteraction flowDestination(final String destination) + { + _flow.setDestination(destination); + return this; + } + + public MessageInteraction flowUnit(final MessageCreditUnit unit) + { + _flow.setUnit(unit); + return this; + } + + public MessageInteraction flowValue(final long value) + { + _flow.setValue(value); + return this; + } + + public MessageInteraction subscribeAcceptMode(final MessageAcceptMode acceptMode) + { + _subscribe.setAcceptMode(acceptMode); + return this; + } + + public MessageInteraction subscribeAcquireMode(final MessageAcquireMode acquireMode) + { + _subscribe.setAcquireMode(acquireMode); + return this; + } + + public Interaction accept() throws Exception + { + return _interaction.sendPerformative(_accept); + } + + public MessageInteraction acceptId(final int id) + { + _accept.setId(id); + return this; + } + + public MessageInteraction acceptTransfers(final RangeSet transfers) + { + _accept.setTransfers(transfers); + return this; + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/PerformativeResponse.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/PerformativeResponse.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/PerformativeResponse.java new file mode 100644 index 0000000..701e92c --- /dev/null +++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/PerformativeResponse.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_10; + +import org.apache.qpid.server.protocol.v0_10.transport.Method; +import org.apache.qpid.tests.protocol.Response; + +public class PerformativeResponse implements Response<Method> +{ + private Method _method; + + public PerformativeResponse(final Method method) + { + _method = method; + } + + @Override + public Method getBody() + { + return _method; + } + + @Override + public String toString() + { + return "PerformativeResponse{" + + "_method=" + _method + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ProtocolEventReceiver.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ProtocolEventReceiver.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ProtocolEventReceiver.java new file mode 100644 index 0000000..37eb66e --- /dev/null +++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ProtocolEventReceiver.java @@ -0,0 +1,67 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_10; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.apache.qpid.server.protocol.v0_10.transport.Method; +import org.apache.qpid.server.protocol.v0_10.transport.ProtocolError; +import org.apache.qpid.server.protocol.v0_10.transport.ProtocolEvent; +import org.apache.qpid.server.protocol.v0_10.transport.ProtocolHeader; +import org.apache.qpid.tests.protocol.HeaderResponse; +import org.apache.qpid.tests.protocol.Response; + +public class ProtocolEventReceiver +{ + private Queue<Response<?>> _events = new ConcurrentLinkedQueue<>(); + private final byte[] _headerBytes; + + public ProtocolEventReceiver(final byte[] headerBytes) + { + _headerBytes = headerBytes; + } + + void received(ProtocolEvent msg) + { + if (msg instanceof ProtocolHeader) + { + _events.add(new HeaderResponse(_headerBytes)); + } + else if (msg instanceof Method) + { + _events.add(new PerformativeResponse((Method) msg)); + } + else if (msg instanceof ProtocolError) + { + _events.add(new ErrorResponse((ProtocolError) msg)); + } + } + + public Collection<Response<?>> getReceivedEvents() + { + Collection<Response<?>> results = new ArrayList<>(_events); + _events.removeAll(results); + return results; + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/SessionInteraction.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/SessionInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/SessionInteraction.java new file mode 100644 index 0000000..fce711d --- /dev/null +++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/SessionInteraction.java @@ -0,0 +1,89 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_10; + +import org.apache.qpid.server.protocol.v0_10.transport.Method; +import org.apache.qpid.server.protocol.v0_10.transport.SessionAttach; +import org.apache.qpid.server.protocol.v0_10.transport.SessionCommandPoint; +import org.apache.qpid.server.protocol.v0_10.transport.SessionDetach; +import org.apache.qpid.server.protocol.v0_10.transport.SessionFlush; + +public class SessionInteraction +{ + private final Interaction _interaction; + private SessionAttach _attach; + private SessionDetach _detach; + private SessionCommandPoint _commandPoint; + private SessionFlush _flush; + + public SessionInteraction(final Interaction interaction) + { + _interaction = interaction; + _attach = new SessionAttach(); + _detach = new SessionDetach(); + _commandPoint = new SessionCommandPoint(); + _flush = new SessionFlush(); + } + + public Interaction attach() throws Exception + { + return _interaction.sendPerformative(_attach); + } + + public SessionInteraction attachName(final byte[] name) + { + _attach.setName(name); + return this; + } + + public Interaction detach() throws Exception + { + return _interaction.sendPerformative(_detach); + } + + public SessionInteraction detachName(final byte[] sessionName) + { + _detach.setName(sessionName); + return this; + } + + public Interaction commandPoint() throws Exception + { + return _interaction.sendPerformative(_commandPoint); + } + + public SessionInteraction commandPointCommandId(final int commandId) + { + _commandPoint.setCommandId(commandId); + return this; + } + + public Interaction flush() throws Exception + { + return _interaction.sendPerformative(_flush); + } + + public SessionInteraction flushCompleted() + { + _flush.setCompleted(true); + return this; + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/TxInteraction.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/TxInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/TxInteraction.java new file mode 100644 index 0000000..14c9912 --- /dev/null +++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/TxInteraction.java @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_10; + +import org.apache.qpid.server.protocol.v0_10.transport.TxCommit; +import org.apache.qpid.server.protocol.v0_10.transport.TxSelect; + +public class TxInteraction +{ + private final Interaction _interaction; + private final TxSelect _select; + private final TxCommit _commit; + + public TxInteraction(final Interaction interaction) + { + _interaction = interaction; + _select = new TxSelect(); + _commit = new TxCommit(); + } + + public Interaction select() throws Exception + { + return _interaction.sendPerformative(_select); + } + + public TxInteraction selectId(final int id) + { + _select.setId(id); + return this; + } + + public TxInteraction commitId(final int id) + { + _commit.setId(id); + return this; + } + + public Interaction commit() throws Exception + { + return _interaction.sendPerformative(_commit); + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/resources/config-protocol-tests-0-10.json ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-10/src/main/resources/config-protocol-tests-0-10.json b/systests/protocol-tests-amqp-0-10/src/main/resources/config-protocol-tests-0-10.json new file mode 100644 index 0000000..69387fb --- /dev/null +++ b/systests/protocol-tests-amqp-0-10/src/main/resources/config-protocol-tests-0-10.json @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +{ + "name" : "${broker.name}", + "modelVersion" : "7.0", + "authenticationproviders" : [ { + "name" : "anon", + "type" : "Anonymous" + }, { + "name" : "plain", + "type" : "Plain", + "secureOnlyMechanisms" : [], + "users" : [ { + "name" : "admin", + "type" : "managed", + "password" : "admin" + }, { + "name" : "guest", + "type" : "managed", + "password" : "guest" + } ] + } ], + "ports" : [ { + "name" : "AMQP", + "type" : "AMQP", + "authenticationProvider" : "plain", + "port" : "0", + "protocols" : [ "AMQP_0_10" ], + "virtualhostaliases" : [ { + "name" : "defaultAlias", + "type" : "defaultAlias" + }, { + "name" : "hostnameAlias", + "type" : "hostnameAlias" + }, { + "name" : "nameAlias", + "type" : "nameAlias" + } ] + }, { + "name" : "ANONYMOUS_AMQP", + "type" : "AMQP", + "authenticationProvider" : "anon", + "port" : "0", + "protocols" : [ "AMQP_0_10" ], + "virtualhostaliases" : [ { + "name" : "defaultAlias", + "type" : "defaultAlias", + "durable" : true + }, { + "name" : "hostnameAlias", + "type" : "hostnameAlias", + "durable" : true + }, { + "name" : "nameAlias", + "type" : "nameAlias", + "durable" : true + } ] + } ], + "virtualhostnodes" : [] +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java new file mode 100644 index 0000000..1072f7c --- /dev/null +++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java @@ -0,0 +1,214 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_10; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.lessThan; +import static org.junit.Assume.assumeThat; + +import java.net.InetSocketAddress; + +import org.hamcrest.core.IsEqual; +import org.junit.Before; +import org.junit.Test; + +import org.apache.qpid.server.protocol.v0_10.transport.ConnectionClose; +import org.apache.qpid.server.protocol.v0_10.transport.ConnectionOpenOk; +import org.apache.qpid.server.protocol.v0_10.transport.ConnectionSecure; +import org.apache.qpid.server.protocol.v0_10.transport.ConnectionStart; +import org.apache.qpid.server.protocol.v0_10.transport.ConnectionTune; +import org.apache.qpid.tests.protocol.ChannelClosedResponse; +import org.apache.qpid.tests.protocol.HeaderResponse; +import org.apache.qpid.tests.protocol.Response; +import org.apache.qpid.tests.protocol.SpecificationTest; +import org.apache.qpid.tests.utils.BrokerAdmin; +import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; + +public class ConnectionTest extends BrokerAdminUsingTestBase +{ + private static final String DEFAULT_LOCALE = "en_US"; + private InetSocketAddress _brokerAddress; + + @Before + public void setUp() + { + _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); + } + + @Test + @SpecificationTest(section = "4.3. Version Negotiation", + description = "When the client opens a new socket connection to an AMQP server," + + " it MUST send a protocol header with the client's preferred protocol version." + + "If the requested protocol version is supported, the server MUST send its own protocol" + + " header with the requested version to the socket, and then implement the protocol accordingly") + public void versionNegotiation() throws Exception + { + try(FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + Response<?> response = interaction.negotiateProtocol().consumeResponse().getLatestResponse(); + assertThat(response, is(instanceOf(HeaderResponse.class))); + assertThat(response.getBody(), is(IsEqual.equalTo(transport.getProtocolHeader()))); + + ConnectionStart connectionStart = interaction.consumeResponse().getLatestResponse(ConnectionStart.class); + assertThat(connectionStart.getMechanisms(), is(notNullValue())); + assertThat(connectionStart.getMechanisms(), contains(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS)); + assertThat(connectionStart.getLocales(), is(notNullValue())); + assertThat(connectionStart.getLocales(), contains(DEFAULT_LOCALE)); + } + } + + @Test + @SpecificationTest(section = "9.connection.start-ok", + description = "An AMQP client MUST handle incoming connection.start controls.") + public void startOk() throws Exception + { + try(FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + interaction.negotiateProtocol().consumeResponse() + .consumeResponse(ConnectionStart.class) + .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk() + .consumeResponse().getLatestResponse(ConnectionTune.class); + } + } + + @Test + @SpecificationTest(section = "9.connection.tune-ok", + description = "This control sends the client's connection tuning parameters to the server." + + " Certain fields are negotiated, others provide capability information.") + public void tuneOkAndOpen() throws Exception + { + try(FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + interaction.negotiateProtocol().consumeResponse() + .consumeResponse(ConnectionStart.class) + .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk() + .consumeResponse(ConnectionTune.class) + .connection().tuneOk() + .connection().open() + .consumeResponse().getLatestResponse(ConnectionOpenOk.class); + } + } + + @Test + @SpecificationTest(section = "9", + description = "open-connection = C:protocol-header S:START C:START-OK *challenge S:TUNE C:TUNE-OK C:OPEN S:OPEN-OK") + public void authenticationBypassBySendingTuneOk() throws Exception + { + InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP); + try(FrameTransport transport = new FrameTransport(brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + interaction.negotiateProtocol().consumeResponse() + .consumeResponse(ConnectionStart.class) + .connection().tuneOk() + .connection().open() + .consumeResponse().getLatestResponse(ConnectionClose.class); + } + } + + @Test + @SpecificationTest(section = "9", + description = "open-connection = C:protocol-header S:START C:START-OK *challenge S:TUNE C:TUNE-OK C:OPEN S:OPEN-OK") + public void authenticationBypassBySendingOpen() throws Exception + { + InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP); + try(FrameTransport transport = new FrameTransport(brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + interaction.negotiateProtocol().consumeResponse().consumeResponse(ConnectionStart.class) + .connection().open() + .consumeResponse().getLatestResponse(ConnectionClose.class); + } + } + + @Test + @SpecificationTest(section = "9", + description = "open-connection = C:protocol-header S:START C:START-OK *challenge S:TUNE C:TUNE-OK C:OPEN S:OPEN-OK") + public void authenticationBypassAfterSendingStartOk() throws Exception + { + InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP); + try(FrameTransport transport = new FrameTransport(brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + interaction.negotiateProtocol().consumeResponse() + .consumeResponse(ConnectionStart.class) + .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_PLAIN).startOk().consumeResponse(ConnectionSecure.class) + .connection().tuneOk() + .connection().open() + .consumeResponse(ConnectionClose.class, ChannelClosedResponse.class); + } + } + + + @Test + @SpecificationTest(section = "9.connection.tune-ok.minimum", + description = "[...] the minimum negotiated value for max-frame-size is also MIN-MAX-FRAME-SIZE [4096]") + public void tooSmallFrameSize() throws Exception + { + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + ConnectionTune response = interaction.negotiateProtocol().consumeResponse() + .consumeResponse(ConnectionStart.class) + .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk() + .consumeResponse().getLatestResponse(ConnectionTune.class); + + interaction.connection().tuneOkChannelMax(response.getChannelMax()) + .tuneOkMaxFrameSize(1024) + .tuneOk() + .connection().open() + .consumeResponse(ConnectionClose.class, ChannelClosedResponse.class); + } + } + + @Test + @SpecificationTest(section = "9.connection.tune-ok.max-frame-size", + description = "If the client specifies a channel max that is higher than the value provided by the server," + + " the server MUST close the connection without attempting a negotiated close." + + " The server may report the error in some fashion to assist implementers.") + public void tooLargeFrameSize() throws Exception + { + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + ConnectionTune response = interaction.negotiateProtocol().consumeResponse() + .consumeResponse(ConnectionStart.class) + .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk() + .consumeResponse().getLatestResponse(ConnectionTune.class); + + assumeThat(response.hasMaxFrameSize(), is(true)); + assumeThat(response.getMaxFrameSize(), is(lessThan(0xFFFF))); + interaction.connection().tuneOkChannelMax(response.getChannelMax()) + .tuneOkMaxFrameSize(response.getMaxFrameSize() + 1) + .tuneOk() + .connection().open() + .consumeResponse(ConnectionClose.class, ChannelClosedResponse.class); + } + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org