implemented sasl sniffing for proton-j; this allows the reactor interop tests to pass
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/46edaebe Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/46edaebe Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/46edaebe Branch: refs/heads/master Commit: 46edaebeb92fa780120f17aa8ac0b54d2feaa8b9 Parents: bf81c44 Author: Rafael Schloming <r...@alum.mit.edu> Authored: Sun Jul 5 19:08:20 2015 -0400 Committer: Rafael Schloming <r...@alum.mit.edu> Committed: Sun Jul 5 19:33:45 2015 -0400 ---------------------------------------------------------------------- proton-c/bindings/python/proton/__init__.py | 2 +- .../impl/HandshakeSniffingTransportWrapper.java | 182 +++++++++++++++++++ .../qpid/proton/engine/impl/SaslImpl.java | 12 +- .../qpid/proton/engine/impl/SaslSniffer.java | 53 ++++++ .../SslHandshakeSniffingTransportWrapper.java | 170 +++-------------- proton-j/src/main/resources/cengine.py | 26 ++- 6 files changed, 284 insertions(+), 161 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/46edaebe/proton-c/bindings/python/proton/__init__.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py index 1032edf..9c75800 100644 --- a/proton-c/bindings/python/proton/__init__.py +++ b/proton-c/bindings/python/proton/__init__.py @@ -3294,7 +3294,7 @@ A callback for trace logging. The callback is passed the transport and log messa def push(self, binary): n = self._check(pn_transport_push(self._impl, binary)) if n != len(binary): - raise OverflowError("unable to process all bytes") + raise OverflowError("unable to process all bytes: %s, %s" % (n, len(binary))) def close_tail(self): self._check(pn_transport_close_tail(self._impl)) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/46edaebe/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandshakeSniffingTransportWrapper.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandshakeSniffingTransportWrapper.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandshakeSniffingTransportWrapper.java new file mode 100644 index 0000000..6a5aac5 --- /dev/null +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandshakeSniffingTransportWrapper.java @@ -0,0 +1,182 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.proton.engine.impl; + +import java.nio.ByteBuffer; + +import org.apache.qpid.proton.engine.Transport; +import org.apache.qpid.proton.engine.TransportException; +import org.apache.qpid.proton.engine.impl.TransportWrapper; + +public abstract class HandshakeSniffingTransportWrapper<T1 extends TransportWrapper, T2 extends TransportWrapper> + implements TransportWrapper +{ + + protected final T1 _wrapper1; + protected final T2 _wrapper2; + + private boolean _tail_closed = false; + private boolean _head_closed = false; + protected TransportWrapper _selectedTransportWrapper; + + private final ByteBuffer _determinationBuffer; + + protected HandshakeSniffingTransportWrapper + (T1 wrapper1, + T2 wrapper2) + { + _wrapper1 = wrapper1; + _wrapper2 = wrapper2; + _determinationBuffer = ByteBuffer.allocate(bufferSize()); + } + + @Override + public int capacity() + { + if (isDeterminationMade()) + { + return _selectedTransportWrapper.capacity(); + } + else + { + if (_tail_closed) { return Transport.END_OF_STREAM; } + return _determinationBuffer.remaining(); + } + } + + @Override + public int position() + { + if (isDeterminationMade()) + { + return _selectedTransportWrapper.position(); + } + else + { + if (_tail_closed) { return Transport.END_OF_STREAM; } + return _determinationBuffer.position(); + } + } + + @Override + public ByteBuffer tail() + { + if (isDeterminationMade()) + { + return _selectedTransportWrapper.tail(); + } + else + { + return _determinationBuffer; + } + } + + protected abstract int bufferSize(); + + protected abstract void makeDetermination(byte[] bytes); + + @Override + public void process() throws TransportException + { + if (isDeterminationMade()) + { + _selectedTransportWrapper.process(); + } + else if (_determinationBuffer.remaining() == 0) + { + _determinationBuffer.flip(); + byte[] bytesInput = new byte[_determinationBuffer.remaining()]; + _determinationBuffer.get(bytesInput); + makeDetermination(bytesInput); + _determinationBuffer.rewind(); + + // TODO what if the selected transport has insufficient capacity?? Maybe use pour, and then try to finish pouring next time round. + _selectedTransportWrapper.tail().put(_determinationBuffer); + _selectedTransportWrapper.process(); + } else if (_tail_closed) { + throw new TransportException("connection aborted"); + } + } + + @Override + public void close_tail() + { + try { + if (isDeterminationMade()) + { + _selectedTransportWrapper.close_tail(); + } + } finally { + _tail_closed = true; + } + } + + @Override + public int pending() + { + if (_head_closed) { return Transport.END_OF_STREAM; } + + if (isDeterminationMade()) { + return _selectedTransportWrapper.pending(); + } else { + return 0; + } + + } + + private static final ByteBuffer EMPTY = ByteBuffer.allocate(0); + + @Override + public ByteBuffer head() + { + if (isDeterminationMade()) { + return _selectedTransportWrapper.head(); + } else { + return EMPTY; + } + } + + @Override + public void pop(int bytes) + { + if (isDeterminationMade()) { + _selectedTransportWrapper.pop(bytes); + } else if (bytes > 0) { + throw new IllegalStateException("no bytes have been read"); + } + } + + @Override + public void close_head() + { + if (isDeterminationMade()) { + _selectedTransportWrapper.close_head(); + } else { + _head_closed = true; + } + } + + protected boolean isDeterminationMade() + { + return _selectedTransportWrapper != null; + } + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/46edaebe/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java index 053c21f..6efb140 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java @@ -485,7 +485,17 @@ public class SaslImpl implements Sasl, SaslFrameBody.SaslFrameBodyHandler<Void>, public TransportWrapper wrap(final TransportInput input, final TransportOutput output) { - return new SaslTransportWrapper(input, output); + return new SaslSniffer(new SaslTransportWrapper(input, output), + new PlainTransportWrapper(output, input)) { + protected boolean isDeterminationMade() { + if (_role == Role.SERVER) { + return super.isDeterminationMade(); + } else { + _selectedTransportWrapper = _wrapper1; + return true; + } + } + }; } @Override http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/46edaebe/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslSniffer.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslSniffer.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslSniffer.java new file mode 100644 index 0000000..2d92496 --- /dev/null +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslSniffer.java @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.proton.engine.impl; + + +/** + * SaslSniffer + * + */ + +class SaslSniffer extends HandshakeSniffingTransportWrapper<TransportWrapper, TransportWrapper> +{ + + SaslSniffer(TransportWrapper sasl, TransportWrapper other) { + super(sasl, other); + } + + protected int bufferSize() { return AmqpHeader.SASL_HEADER.length; } + + protected void makeDetermination(byte[] bytes) { + if (bytes.length < bufferSize()) { + throw new IllegalArgumentException("insufficient bytes"); + } + + for (int i = 0; i < AmqpHeader.SASL_HEADER.length; i++) { + if (bytes[i] != AmqpHeader.SASL_HEADER[i]) { + _selectedTransportWrapper = _wrapper2; + return; + } + } + + _selectedTransportWrapper = _wrapper1; + } + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/46edaebe/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapper.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapper.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapper.java index 1efc7a9..c678343 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapper.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapper.java @@ -20,150 +20,21 @@ */ package org.apache.qpid.proton.engine.impl.ssl; -import java.nio.ByteBuffer; - -import org.apache.qpid.proton.engine.Transport; -import org.apache.qpid.proton.engine.TransportException; +import org.apache.qpid.proton.engine.impl.HandshakeSniffingTransportWrapper; import org.apache.qpid.proton.engine.impl.TransportWrapper; -public class SslHandshakeSniffingTransportWrapper implements SslTransportWrapper -{ - private static final int MINIMUM_LENGTH_FOR_DETERMINATION = 5; - private final SslTransportWrapper _secureTransportWrapper; - private final TransportWrapper _plainTransportWrapper; - - private boolean _tail_closed = false; - private boolean _head_closed = false; - private TransportWrapper _selectedTransportWrapper; - - private final ByteBuffer _determinationBuffer = ByteBuffer.allocate(MINIMUM_LENGTH_FOR_DETERMINATION); - - SslHandshakeSniffingTransportWrapper - (SslTransportWrapper secureTransportWrapper, - TransportWrapper plainTransportWrapper) - { - _secureTransportWrapper = secureTransportWrapper; - _plainTransportWrapper = plainTransportWrapper; - } - - @Override - public int capacity() - { - if (isDeterminationMade()) - { - return _selectedTransportWrapper.capacity(); - } - else - { - if (_tail_closed) { return Transport.END_OF_STREAM; } - return _determinationBuffer.remaining(); - } - } - - @Override - public int position() - { - if (isDeterminationMade()) - { - return _selectedTransportWrapper.position(); - } - else - { - if (_tail_closed) { return Transport.END_OF_STREAM; } - return _determinationBuffer.position(); - } - } - - @Override - public ByteBuffer tail() - { - if (isDeterminationMade()) - { - return _selectedTransportWrapper.tail(); - } - else - { - return _determinationBuffer; - } - } - - @Override - public void process() throws TransportException - { - if (isDeterminationMade()) - { - _selectedTransportWrapper.process(); - } - else if (_determinationBuffer.remaining() == 0) - { - _determinationBuffer.flip(); - byte[] bytesInput = new byte[_determinationBuffer.remaining()]; - _determinationBuffer.get(bytesInput); - makeSslDetermination(bytesInput); - _determinationBuffer.rewind(); - - // TODO what if the selected transport has insufficient capacity?? Maybe use pour, and then try to finish pouring next time round. - _selectedTransportWrapper.tail().put(_determinationBuffer); - _selectedTransportWrapper.process(); - } else if (_tail_closed) { - throw new TransportException("connection aborted"); - } - } - - @Override - public void close_tail() - { - try { - if (isDeterminationMade()) - { - _selectedTransportWrapper.close_tail(); - } - } finally { - _tail_closed = true; - } - } - - @Override - public int pending() - { - if (_head_closed) { return Transport.END_OF_STREAM; } - - if (isDeterminationMade()) { - return _selectedTransportWrapper.pending(); - } else { - return 0; - } - - } - @Override - public ByteBuffer head() - { - if (isDeterminationMade()) { - return _selectedTransportWrapper.head(); - } else { - return null; - } - } +/** + * SslHandshakeSniffingTransportWrapper + * + */ - @Override - public void pop(int bytes) - { - if (isDeterminationMade()) { - _selectedTransportWrapper.pop(bytes); - } else if (bytes > 0) { - throw new IllegalStateException("no bytes have been read"); - } - } +public class SslHandshakeSniffingTransportWrapper extends HandshakeSniffingTransportWrapper<SslTransportWrapper, TransportWrapper> + implements SslTransportWrapper +{ - @Override - public void close_head() - { - if (isDeterminationMade()) { - _selectedTransportWrapper.close_head(); - } else { - _head_closed = true; - } + SslHandshakeSniffingTransportWrapper(SslTransportWrapper ssl, TransportWrapper plain) { + super(ssl, plain); } @Override @@ -171,7 +42,7 @@ public class SslHandshakeSniffingTransportWrapper implements SslTransportWrapper { if(isSecureWrapperSelected()) { - return _secureTransportWrapper.getCipherName(); + return _wrapper1.getCipherName(); } else { @@ -185,7 +56,7 @@ public class SslHandshakeSniffingTransportWrapper implements SslTransportWrapper { if (isSecureWrapperSelected()) { - return _secureTransportWrapper.getProtocolName(); + return _wrapper1.getProtocolName(); } else { @@ -195,32 +66,33 @@ public class SslHandshakeSniffingTransportWrapper implements SslTransportWrapper private boolean isSecureWrapperSelected() { - return _selectedTransportWrapper == _secureTransportWrapper; + return _selectedTransportWrapper == _wrapper1; } - private boolean isDeterminationMade() - { - return _selectedTransportWrapper != null; + protected int bufferSize() { + // minimum length for determination + return 5; } - private void makeSslDetermination(byte[] bytesInput) + protected void makeDetermination(byte[] bytesInput) { boolean isSecure = checkForSslHandshake(bytesInput); if (isSecure) { - _selectedTransportWrapper = _secureTransportWrapper; + _selectedTransportWrapper = _wrapper1; } else { - _selectedTransportWrapper = _plainTransportWrapper; + _selectedTransportWrapper = _wrapper2; } } + // TODO perhaps the sniffer should save up the bytes from each // input call until it has sufficient bytes to make the determination // and only then pass them to the secure or plain wrapped transport? private boolean checkForSslHandshake(byte[] buf) { - if (buf.length >= MINIMUM_LENGTH_FOR_DETERMINATION) + if (buf.length >= bufferSize()) { /* * SSLv2 Client Hello format http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/46edaebe/proton-j/src/main/resources/cengine.py ---------------------------------------------------------------------- diff --git a/proton-j/src/main/resources/cengine.py b/proton-j/src/main/resources/cengine.py index 1e89220..c94d023 100644 --- a/proton-j/src/main/resources/cengine.py +++ b/proton-j/src/main/resources/cengine.py @@ -940,16 +940,22 @@ def pn_transport_capacity(trans): return trans.impl.capacity() def pn_transport_push(trans, input): - cap = pn_transport_capacity(trans) - if cap < 0: - return cap - elif len(input) > cap: - input = input[:cap] - - bb = trans.impl.tail() - bb.put(array(input, 'b')) - trans.impl.process() - return len(input) + result = 0 + while input: + cap = pn_transport_capacity(trans) + if cap < 0: + return cap + elif len(input) > cap: + trimmed = input[:cap] + else: + trimmed = input + + bb = trans.impl.tail() + bb.put(array(trimmed, 'b')) + trans.impl.process() + input = input[cap:] + result += len(trimmed) + return result def pn_transport_close_head(trans): trans.impl.close_head() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org