Is this change allowing clients to skip the SASL layer when connecting
to servers that have enabled the SASL layer? If so, how is the new
default behaviour disabled?

The existing but unimplemented 'allowSkip' method previously intended
to enable such behaviour still doesn't do anything, so is there a way
to require clients use a SASL layer as would have been previously
after enabling SASL for a proton-j (and in the past a proton-c)
server?

Robbie

On 6 July 2015 at 00:45,  <r...@apache.org> wrote:
> implemented sasl sniffing for proton-j; this allows the reactor interop tests 
> to pass
>
>
> Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
> Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/46edaebe
> Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/46edaebe
> Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/46edaebe
>
> Branch: refs/heads/master
> Commit: 46edaebeb92fa780120f17aa8ac0b54d2feaa8b9
> Parents: bf81c44
> Author: Rafael Schloming <r...@alum.mit.edu>
> Authored: Sun Jul 5 19:08:20 2015 -0400
> Committer: Rafael Schloming <r...@alum.mit.edu>
> Committed: Sun Jul 5 19:33:45 2015 -0400
>
> ----------------------------------------------------------------------
>  proton-c/bindings/python/proton/__init__.py     |   2 +-
>  .../impl/HandshakeSniffingTransportWrapper.java | 182 +++++++++++++++++++
>  .../qpid/proton/engine/impl/SaslImpl.java       |  12 +-
>  .../qpid/proton/engine/impl/SaslSniffer.java    |  53 ++++++
>  .../SslHandshakeSniffingTransportWrapper.java   | 170 +++--------------
>  proton-j/src/main/resources/cengine.py          |  26 ++-
>  6 files changed, 284 insertions(+), 161 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/46edaebe/proton-c/bindings/python/proton/__init__.py
> ----------------------------------------------------------------------
> diff --git a/proton-c/bindings/python/proton/__init__.py 
> b/proton-c/bindings/python/proton/__init__.py
> index 1032edf..9c75800 100644
> --- a/proton-c/bindings/python/proton/__init__.py
> +++ b/proton-c/bindings/python/proton/__init__.py
> @@ -3294,7 +3294,7 @@ A callback for trace logging. The callback is passed 
> the transport and log messa
>    def push(self, binary):
>      n = self._check(pn_transport_push(self._impl, binary))
>      if n != len(binary):
> -      raise OverflowError("unable to process all bytes")
> +      raise OverflowError("unable to process all bytes: %s, %s" % (n, 
> len(binary)))
>
>    def close_tail(self):
>      self._check(pn_transport_close_tail(self._impl))
>
> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/46edaebe/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandshakeSniffingTransportWrapper.java
> ----------------------------------------------------------------------
> diff --git 
> a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandshakeSniffingTransportWrapper.java
>  
> b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandshakeSniffingTransportWrapper.java
> new file mode 100644
> index 0000000..6a5aac5
> --- /dev/null
> +++ 
> b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandshakeSniffingTransportWrapper.java
> @@ -0,0 +1,182 @@
> +/*
> + *
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + *   http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing,
> + * software distributed under the License is distributed on an
> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> + * KIND, either express or implied.  See the License for the
> + * specific language governing permissions and limitations
> + * under the License.
> + *
> + */
> +package org.apache.qpid.proton.engine.impl;
> +
> +import java.nio.ByteBuffer;
> +
> +import org.apache.qpid.proton.engine.Transport;
> +import org.apache.qpid.proton.engine.TransportException;
> +import org.apache.qpid.proton.engine.impl.TransportWrapper;
> +
> +public abstract class HandshakeSniffingTransportWrapper<T1 extends 
> TransportWrapper, T2 extends TransportWrapper>
> +    implements TransportWrapper
> +{
> +
> +    protected final T1 _wrapper1;
> +    protected final T2 _wrapper2;
> +
> +    private boolean _tail_closed = false;
> +    private boolean _head_closed = false;
> +    protected TransportWrapper _selectedTransportWrapper;
> +
> +    private final ByteBuffer _determinationBuffer;
> +
> +    protected HandshakeSniffingTransportWrapper
> +        (T1 wrapper1,
> +         T2 wrapper2)
> +    {
> +        _wrapper1 = wrapper1;
> +        _wrapper2 = wrapper2;
> +        _determinationBuffer = ByteBuffer.allocate(bufferSize());
> +    }
> +
> +    @Override
> +    public int capacity()
> +    {
> +        if (isDeterminationMade())
> +        {
> +            return _selectedTransportWrapper.capacity();
> +        }
> +        else
> +        {
> +            if (_tail_closed) { return Transport.END_OF_STREAM; }
> +            return _determinationBuffer.remaining();
> +        }
> +    }
> +
> +    @Override
> +    public int position()
> +    {
> +        if (isDeterminationMade())
> +        {
> +            return _selectedTransportWrapper.position();
> +        }
> +        else
> +        {
> +            if (_tail_closed) { return Transport.END_OF_STREAM; }
> +            return _determinationBuffer.position();
> +        }
> +    }
> +
> +    @Override
> +    public ByteBuffer tail()
> +    {
> +        if (isDeterminationMade())
> +        {
> +            return _selectedTransportWrapper.tail();
> +        }
> +        else
> +        {
> +            return _determinationBuffer;
> +        }
> +    }
> +
> +    protected abstract int bufferSize();
> +
> +    protected abstract void makeDetermination(byte[] bytes);
> +
> +    @Override
> +    public void process() throws TransportException
> +    {
> +        if (isDeterminationMade())
> +        {
> +            _selectedTransportWrapper.process();
> +        }
> +        else if (_determinationBuffer.remaining() == 0)
> +        {
> +            _determinationBuffer.flip();
> +            byte[] bytesInput = new byte[_determinationBuffer.remaining()];
> +            _determinationBuffer.get(bytesInput);
> +            makeDetermination(bytesInput);
> +            _determinationBuffer.rewind();
> +
> +            // TODO what if the selected transport has insufficient 
> capacity?? Maybe use pour, and then try to finish pouring next time round.
> +            _selectedTransportWrapper.tail().put(_determinationBuffer);
> +            _selectedTransportWrapper.process();
> +        } else if (_tail_closed) {
> +            throw new TransportException("connection aborted");
> +        }
> +    }
> +
> +    @Override
> +    public void close_tail()
> +    {
> +        try {
> +            if (isDeterminationMade())
> +            {
> +                _selectedTransportWrapper.close_tail();
> +            }
> +        } finally {
> +            _tail_closed = true;
> +        }
> +    }
> +
> +    @Override
> +    public int pending()
> +    {
> +        if (_head_closed) { return Transport.END_OF_STREAM; }
> +
> +        if (isDeterminationMade()) {
> +            return _selectedTransportWrapper.pending();
> +        } else {
> +            return 0;
> +        }
> +
> +    }
> +
> +    private static final ByteBuffer EMPTY = ByteBuffer.allocate(0);
> +
> +    @Override
> +    public ByteBuffer head()
> +    {
> +        if (isDeterminationMade()) {
> +            return _selectedTransportWrapper.head();
> +        } else {
> +            return EMPTY;
> +        }
> +    }
> +
> +    @Override
> +    public void pop(int bytes)
> +    {
> +        if (isDeterminationMade()) {
> +            _selectedTransportWrapper.pop(bytes);
> +        } else if (bytes > 0) {
> +            throw new IllegalStateException("no bytes have been read");
> +        }
> +    }
> +
> +    @Override
> +    public void close_head()
> +    {
> +        if (isDeterminationMade()) {
> +            _selectedTransportWrapper.close_head();
> +        } else {
> +            _head_closed = true;
> +        }
> +    }
> +
> +    protected boolean isDeterminationMade()
> +    {
> +        return _selectedTransportWrapper != null;
> +    }
> +
> +}
>
> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/46edaebe/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
> ----------------------------------------------------------------------
> diff --git 
> a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java 
> b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
> index 053c21f..6efb140 100644
> --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
> +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
> @@ -485,7 +485,17 @@ public class SaslImpl implements Sasl, 
> SaslFrameBody.SaslFrameBodyHandler<Void>,
>
>      public TransportWrapper wrap(final TransportInput input, final 
> TransportOutput output)
>      {
> -        return new SaslTransportWrapper(input, output);
> +        return new SaslSniffer(new SaslTransportWrapper(input, output),
> +                               new PlainTransportWrapper(output, input)) {
> +            protected boolean isDeterminationMade() {
> +                if (_role == Role.SERVER) {
> +                    return super.isDeterminationMade();
> +                } else {
> +                    _selectedTransportWrapper = _wrapper1;
> +                    return true;
> +                }
> +            }
> +        };
>      }
>
>      @Override
>
> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/46edaebe/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslSniffer.java
> ----------------------------------------------------------------------
> diff --git 
> a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslSniffer.java 
> b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslSniffer.java
> new file mode 100644
> index 0000000..2d92496
> --- /dev/null
> +++ 
> b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslSniffer.java
> @@ -0,0 +1,53 @@
> +/*
> + *
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + *   http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing,
> + * software distributed under the License is distributed on an
> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> + * KIND, either express or implied.  See the License for the
> + * specific language governing permissions and limitations
> + * under the License.
> + *
> + */
> +package org.apache.qpid.proton.engine.impl;
> +
> +
> +/**
> + * SaslSniffer
> + *
> + */
> +
> +class SaslSniffer extends 
> HandshakeSniffingTransportWrapper<TransportWrapper, TransportWrapper>
> +{
> +
> +    SaslSniffer(TransportWrapper sasl, TransportWrapper other) {
> +        super(sasl, other);
> +    }
> +
> +    protected int bufferSize() { return AmqpHeader.SASL_HEADER.length; }
> +
> +    protected void makeDetermination(byte[] bytes) {
> +        if (bytes.length < bufferSize()) {
> +            throw new IllegalArgumentException("insufficient bytes");
> +        }
> +
> +        for (int i = 0; i < AmqpHeader.SASL_HEADER.length; i++) {
> +            if (bytes[i] != AmqpHeader.SASL_HEADER[i]) {
> +                _selectedTransportWrapper = _wrapper2;
> +                return;
> +            }
> +        }
> +
> +        _selectedTransportWrapper = _wrapper1;
> +    }
> +
> +}
>
> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/46edaebe/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapper.java
> ----------------------------------------------------------------------
> diff --git 
> a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapper.java
>  
> b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapper.java
> index 1efc7a9..c678343 100644
> --- 
> a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapper.java
> +++ 
> b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapper.java
> @@ -20,150 +20,21 @@
>   */
>  package org.apache.qpid.proton.engine.impl.ssl;
>
> -import java.nio.ByteBuffer;
> -
> -import org.apache.qpid.proton.engine.Transport;
> -import org.apache.qpid.proton.engine.TransportException;
> +import org.apache.qpid.proton.engine.impl.HandshakeSniffingTransportWrapper;
>  import org.apache.qpid.proton.engine.impl.TransportWrapper;
>
> -public class SslHandshakeSniffingTransportWrapper implements 
> SslTransportWrapper
> -{
> -    private static final int MINIMUM_LENGTH_FOR_DETERMINATION = 5;
> -    private final SslTransportWrapper _secureTransportWrapper;
> -    private final TransportWrapper _plainTransportWrapper;
> -
> -    private boolean _tail_closed = false;
> -    private boolean _head_closed = false;
> -    private TransportWrapper _selectedTransportWrapper;
> -
> -    private final ByteBuffer _determinationBuffer = 
> ByteBuffer.allocate(MINIMUM_LENGTH_FOR_DETERMINATION);
> -
> -    SslHandshakeSniffingTransportWrapper
> -        (SslTransportWrapper secureTransportWrapper,
> -         TransportWrapper plainTransportWrapper)
> -    {
> -        _secureTransportWrapper = secureTransportWrapper;
> -        _plainTransportWrapper = plainTransportWrapper;
> -    }
> -
> -    @Override
> -    public int capacity()
> -    {
> -        if (isDeterminationMade())
> -        {
> -            return _selectedTransportWrapper.capacity();
> -        }
> -        else
> -        {
> -            if (_tail_closed) { return Transport.END_OF_STREAM; }
> -            return _determinationBuffer.remaining();
> -        }
> -    }
> -
> -    @Override
> -    public int position()
> -    {
> -        if (isDeterminationMade())
> -        {
> -            return _selectedTransportWrapper.position();
> -        }
> -        else
> -        {
> -            if (_tail_closed) { return Transport.END_OF_STREAM; }
> -            return _determinationBuffer.position();
> -        }
> -    }
> -
> -    @Override
> -    public ByteBuffer tail()
> -    {
> -        if (isDeterminationMade())
> -        {
> -            return _selectedTransportWrapper.tail();
> -        }
> -        else
> -        {
> -            return _determinationBuffer;
> -        }
> -    }
> -
> -    @Override
> -    public void process() throws TransportException
> -    {
> -        if (isDeterminationMade())
> -        {
> -            _selectedTransportWrapper.process();
> -        }
> -        else if (_determinationBuffer.remaining() == 0)
> -        {
> -            _determinationBuffer.flip();
> -            byte[] bytesInput = new byte[_determinationBuffer.remaining()];
> -            _determinationBuffer.get(bytesInput);
> -            makeSslDetermination(bytesInput);
> -            _determinationBuffer.rewind();
> -
> -            // TODO what if the selected transport has insufficient 
> capacity?? Maybe use pour, and then try to finish pouring next time round.
> -            _selectedTransportWrapper.tail().put(_determinationBuffer);
> -            _selectedTransportWrapper.process();
> -        } else if (_tail_closed) {
> -            throw new TransportException("connection aborted");
> -        }
> -    }
> -
> -    @Override
> -    public void close_tail()
> -    {
> -        try {
> -            if (isDeterminationMade())
> -            {
> -                _selectedTransportWrapper.close_tail();
> -            }
> -        } finally {
> -            _tail_closed = true;
> -        }
> -    }
> -
> -    @Override
> -    public int pending()
> -    {
> -        if (_head_closed) { return Transport.END_OF_STREAM; }
> -
> -        if (isDeterminationMade()) {
> -            return _selectedTransportWrapper.pending();
> -        } else {
> -            return 0;
> -        }
> -
> -    }
>
> -    @Override
> -    public ByteBuffer head()
> -    {
> -        if (isDeterminationMade()) {
> -            return _selectedTransportWrapper.head();
> -        } else {
> -            return null;
> -        }
> -    }
> +/**
> + * SslHandshakeSniffingTransportWrapper
> + *
> + */
>
> -    @Override
> -    public void pop(int bytes)
> -    {
> -        if (isDeterminationMade()) {
> -            _selectedTransportWrapper.pop(bytes);
> -        } else if (bytes > 0) {
> -            throw new IllegalStateException("no bytes have been read");
> -        }
> -    }
> +public class SslHandshakeSniffingTransportWrapper extends 
> HandshakeSniffingTransportWrapper<SslTransportWrapper, TransportWrapper>
> +    implements SslTransportWrapper
> +{
>
> -    @Override
> -    public void close_head()
> -    {
> -        if (isDeterminationMade()) {
> -            _selectedTransportWrapper.close_head();
> -        } else {
> -            _head_closed = true;
> -        }
> +    SslHandshakeSniffingTransportWrapper(SslTransportWrapper ssl, 
> TransportWrapper plain) {
> +        super(ssl, plain);
>      }
>
>      @Override
> @@ -171,7 +42,7 @@ public class SslHandshakeSniffingTransportWrapper 
> implements SslTransportWrapper
>      {
>          if(isSecureWrapperSelected())
>          {
> -            return _secureTransportWrapper.getCipherName();
> +            return _wrapper1.getCipherName();
>          }
>          else
>          {
> @@ -185,7 +56,7 @@ public class SslHandshakeSniffingTransportWrapper 
> implements SslTransportWrapper
>      {
>          if (isSecureWrapperSelected())
>          {
> -            return _secureTransportWrapper.getProtocolName();
> +            return _wrapper1.getProtocolName();
>          }
>          else
>          {
> @@ -195,32 +66,33 @@ public class SslHandshakeSniffingTransportWrapper 
> implements SslTransportWrapper
>
>      private boolean isSecureWrapperSelected()
>      {
> -        return _selectedTransportWrapper == _secureTransportWrapper;
> +        return _selectedTransportWrapper == _wrapper1;
>      }
>
> -    private boolean isDeterminationMade()
> -    {
> -        return _selectedTransportWrapper != null;
> +    protected int bufferSize() {
> +        // minimum length for determination
> +        return 5;
>      }
>
> -    private void makeSslDetermination(byte[] bytesInput)
> +    protected void makeDetermination(byte[] bytesInput)
>      {
>          boolean isSecure = checkForSslHandshake(bytesInput);
>          if (isSecure)
>          {
> -            _selectedTransportWrapper = _secureTransportWrapper;
> +            _selectedTransportWrapper = _wrapper1;
>          }
>          else
>          {
> -            _selectedTransportWrapper = _plainTransportWrapper;
> +            _selectedTransportWrapper = _wrapper2;
>          }
>      }
> +
>      // TODO perhaps the sniffer should save up the bytes from each
>      // input call until it has sufficient bytes to make the determination
>      // and only then pass them to the secure or plain wrapped transport?
>      private boolean checkForSslHandshake(byte[] buf)
>      {
> -        if (buf.length >= MINIMUM_LENGTH_FOR_DETERMINATION)
> +        if (buf.length >= bufferSize())
>          {
>              /*
>               * SSLv2 Client Hello format
>
> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/46edaebe/proton-j/src/main/resources/cengine.py
> ----------------------------------------------------------------------
> diff --git a/proton-j/src/main/resources/cengine.py 
> b/proton-j/src/main/resources/cengine.py
> index 1e89220..c94d023 100644
> --- a/proton-j/src/main/resources/cengine.py
> +++ b/proton-j/src/main/resources/cengine.py
> @@ -940,16 +940,22 @@ def pn_transport_capacity(trans):
>    return trans.impl.capacity()
>
>  def pn_transport_push(trans, input):
> -  cap = pn_transport_capacity(trans)
> -  if cap < 0:
> -    return cap
> -  elif len(input) > cap:
> -    input = input[:cap]
> -
> -  bb = trans.impl.tail()
> -  bb.put(array(input, 'b'))
> -  trans.impl.process()
> -  return len(input)
> +  result = 0
> +  while input:
> +    cap = pn_transport_capacity(trans)
> +    if cap < 0:
> +      return cap
> +    elif len(input) > cap:
> +      trimmed = input[:cap]
> +    else:
> +      trimmed = input
> +
> +    bb = trans.impl.tail()
> +    bb.put(array(trimmed, 'b'))
> +    trans.impl.process()
> +    input = input[cap:]
> +    result += len(trimmed)
> +  return result
>
>  def pn_transport_close_head(trans):
>    trans.impl.close_head()
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
> For additional commands, e-mail: commits-h...@qpid.apache.org
>

Reply via email to