implemented sasl sniffing for proton-j; this allows the reactor interop tests 
to pass


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/46edaebe
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/46edaebe
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/46edaebe

Branch: refs/heads/master
Commit: 46edaebeb92fa780120f17aa8ac0b54d2feaa8b9
Parents: bf81c44
Author: Rafael Schloming <r...@alum.mit.edu>
Authored: Sun Jul 5 19:08:20 2015 -0400
Committer: Rafael Schloming <r...@alum.mit.edu>
Committed: Sun Jul 5 19:33:45 2015 -0400

----------------------------------------------------------------------
 proton-c/bindings/python/proton/__init__.py     |   2 +-
 .../impl/HandshakeSniffingTransportWrapper.java | 182 +++++++++++++++++++
 .../qpid/proton/engine/impl/SaslImpl.java       |  12 +-
 .../qpid/proton/engine/impl/SaslSniffer.java    |  53 ++++++
 .../SslHandshakeSniffingTransportWrapper.java   | 170 +++--------------
 proton-j/src/main/resources/cengine.py          |  26 ++-
 6 files changed, 284 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/46edaebe/proton-c/bindings/python/proton/__init__.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/__init__.py 
b/proton-c/bindings/python/proton/__init__.py
index 1032edf..9c75800 100644
--- a/proton-c/bindings/python/proton/__init__.py
+++ b/proton-c/bindings/python/proton/__init__.py
@@ -3294,7 +3294,7 @@ A callback for trace logging. The callback is passed the 
transport and log messa
   def push(self, binary):
     n = self._check(pn_transport_push(self._impl, binary))
     if n != len(binary):
-      raise OverflowError("unable to process all bytes")
+      raise OverflowError("unable to process all bytes: %s, %s" % (n, 
len(binary)))
 
   def close_tail(self):
     self._check(pn_transport_close_tail(self._impl))

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/46edaebe/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandshakeSniffingTransportWrapper.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandshakeSniffingTransportWrapper.java
 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandshakeSniffingTransportWrapper.java
new file mode 100644
index 0000000..6a5aac5
--- /dev/null
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandshakeSniffingTransportWrapper.java
@@ -0,0 +1,182 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.engine.impl;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.proton.engine.Transport;
+import org.apache.qpid.proton.engine.TransportException;
+import org.apache.qpid.proton.engine.impl.TransportWrapper;
+
+public abstract class HandshakeSniffingTransportWrapper<T1 extends 
TransportWrapper, T2 extends TransportWrapper>
+    implements TransportWrapper
+{
+
+    protected final T1 _wrapper1;
+    protected final T2 _wrapper2;
+
+    private boolean _tail_closed = false;
+    private boolean _head_closed = false;
+    protected TransportWrapper _selectedTransportWrapper;
+
+    private final ByteBuffer _determinationBuffer;
+
+    protected HandshakeSniffingTransportWrapper
+        (T1 wrapper1,
+         T2 wrapper2)
+    {
+        _wrapper1 = wrapper1;
+        _wrapper2 = wrapper2;
+        _determinationBuffer = ByteBuffer.allocate(bufferSize());
+    }
+
+    @Override
+    public int capacity()
+    {
+        if (isDeterminationMade())
+        {
+            return _selectedTransportWrapper.capacity();
+        }
+        else
+        {
+            if (_tail_closed) { return Transport.END_OF_STREAM; }
+            return _determinationBuffer.remaining();
+        }
+    }
+
+    @Override
+    public int position()
+    {
+        if (isDeterminationMade())
+        {
+            return _selectedTransportWrapper.position();
+        }
+        else
+        {
+            if (_tail_closed) { return Transport.END_OF_STREAM; }
+            return _determinationBuffer.position();
+        }
+    }
+
+    @Override
+    public ByteBuffer tail()
+    {
+        if (isDeterminationMade())
+        {
+            return _selectedTransportWrapper.tail();
+        }
+        else
+        {
+            return _determinationBuffer;
+        }
+    }
+
+    protected abstract int bufferSize();
+
+    protected abstract void makeDetermination(byte[] bytes);
+
+    @Override
+    public void process() throws TransportException
+    {
+        if (isDeterminationMade())
+        {
+            _selectedTransportWrapper.process();
+        }
+        else if (_determinationBuffer.remaining() == 0)
+        {
+            _determinationBuffer.flip();
+            byte[] bytesInput = new byte[_determinationBuffer.remaining()];
+            _determinationBuffer.get(bytesInput);
+            makeDetermination(bytesInput);
+            _determinationBuffer.rewind();
+
+            // TODO what if the selected transport has insufficient capacity?? 
Maybe use pour, and then try to finish pouring next time round.
+            _selectedTransportWrapper.tail().put(_determinationBuffer);
+            _selectedTransportWrapper.process();
+        } else if (_tail_closed) {
+            throw new TransportException("connection aborted");
+        }
+    }
+
+    @Override
+    public void close_tail()
+    {
+        try {
+            if (isDeterminationMade())
+            {
+                _selectedTransportWrapper.close_tail();
+            }
+        } finally {
+            _tail_closed = true;
+        }
+    }
+
+    @Override
+    public int pending()
+    {
+        if (_head_closed) { return Transport.END_OF_STREAM; }
+
+        if (isDeterminationMade()) {
+            return _selectedTransportWrapper.pending();
+        } else {
+            return 0;
+        }
+
+    }
+
+    private static final ByteBuffer EMPTY = ByteBuffer.allocate(0);
+
+    @Override
+    public ByteBuffer head()
+    {
+        if (isDeterminationMade()) {
+            return _selectedTransportWrapper.head();
+        } else {
+            return EMPTY;
+        }
+    }
+
+    @Override
+    public void pop(int bytes)
+    {
+        if (isDeterminationMade()) {
+            _selectedTransportWrapper.pop(bytes);
+        } else if (bytes > 0) {
+            throw new IllegalStateException("no bytes have been read");
+        }
+    }
+
+    @Override
+    public void close_head()
+    {
+        if (isDeterminationMade()) {
+            _selectedTransportWrapper.close_head();
+        } else {
+            _head_closed = true;
+        }
+    }
+
+    protected boolean isDeterminationMade()
+    {
+        return _selectedTransportWrapper != null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/46edaebe/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
index 053c21f..6efb140 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
@@ -485,7 +485,17 @@ public class SaslImpl implements Sasl, 
SaslFrameBody.SaslFrameBodyHandler<Void>,
 
     public TransportWrapper wrap(final TransportInput input, final 
TransportOutput output)
     {
-        return new SaslTransportWrapper(input, output);
+        return new SaslSniffer(new SaslTransportWrapper(input, output),
+                               new PlainTransportWrapper(output, input)) {
+            protected boolean isDeterminationMade() {
+                if (_role == Role.SERVER) {
+                    return super.isDeterminationMade();
+                } else {
+                    _selectedTransportWrapper = _wrapper1;
+                    return true;
+                }
+            }
+        };
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/46edaebe/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslSniffer.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslSniffer.java 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslSniffer.java
new file mode 100644
index 0000000..2d92496
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslSniffer.java
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.engine.impl;
+
+
+/**
+ * SaslSniffer
+ *
+ */
+
+class SaslSniffer extends HandshakeSniffingTransportWrapper<TransportWrapper, 
TransportWrapper>
+{
+
+    SaslSniffer(TransportWrapper sasl, TransportWrapper other) {
+        super(sasl, other);
+    }
+
+    protected int bufferSize() { return AmqpHeader.SASL_HEADER.length; }
+
+    protected void makeDetermination(byte[] bytes) {
+        if (bytes.length < bufferSize()) {
+            throw new IllegalArgumentException("insufficient bytes");
+        }
+
+        for (int i = 0; i < AmqpHeader.SASL_HEADER.length; i++) {
+            if (bytes[i] != AmqpHeader.SASL_HEADER[i]) {
+                _selectedTransportWrapper = _wrapper2;
+                return;
+            }
+        }
+
+        _selectedTransportWrapper = _wrapper1;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/46edaebe/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapper.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapper.java
 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapper.java
index 1efc7a9..c678343 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapper.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapper.java
@@ -20,150 +20,21 @@
  */
 package org.apache.qpid.proton.engine.impl.ssl;
 
-import java.nio.ByteBuffer;
-
-import org.apache.qpid.proton.engine.Transport;
-import org.apache.qpid.proton.engine.TransportException;
+import org.apache.qpid.proton.engine.impl.HandshakeSniffingTransportWrapper;
 import org.apache.qpid.proton.engine.impl.TransportWrapper;
 
-public class SslHandshakeSniffingTransportWrapper implements 
SslTransportWrapper
-{
-    private static final int MINIMUM_LENGTH_FOR_DETERMINATION = 5;
-    private final SslTransportWrapper _secureTransportWrapper;
-    private final TransportWrapper _plainTransportWrapper;
-
-    private boolean _tail_closed = false;
-    private boolean _head_closed = false;
-    private TransportWrapper _selectedTransportWrapper;
-
-    private final ByteBuffer _determinationBuffer = 
ByteBuffer.allocate(MINIMUM_LENGTH_FOR_DETERMINATION);
-
-    SslHandshakeSniffingTransportWrapper
-        (SslTransportWrapper secureTransportWrapper,
-         TransportWrapper plainTransportWrapper)
-    {
-        _secureTransportWrapper = secureTransportWrapper;
-        _plainTransportWrapper = plainTransportWrapper;
-    }
-
-    @Override
-    public int capacity()
-    {
-        if (isDeterminationMade())
-        {
-            return _selectedTransportWrapper.capacity();
-        }
-        else
-        {
-            if (_tail_closed) { return Transport.END_OF_STREAM; }
-            return _determinationBuffer.remaining();
-        }
-    }
-
-    @Override
-    public int position()
-    {
-        if (isDeterminationMade())
-        {
-            return _selectedTransportWrapper.position();
-        }
-        else
-        {
-            if (_tail_closed) { return Transport.END_OF_STREAM; }
-            return _determinationBuffer.position();
-        }
-    }
-
-    @Override
-    public ByteBuffer tail()
-    {
-        if (isDeterminationMade())
-        {
-            return _selectedTransportWrapper.tail();
-        }
-        else
-        {
-            return _determinationBuffer;
-        }
-    }
-
-    @Override
-    public void process() throws TransportException
-    {
-        if (isDeterminationMade())
-        {
-            _selectedTransportWrapper.process();
-        }
-        else if (_determinationBuffer.remaining() == 0)
-        {
-            _determinationBuffer.flip();
-            byte[] bytesInput = new byte[_determinationBuffer.remaining()];
-            _determinationBuffer.get(bytesInput);
-            makeSslDetermination(bytesInput);
-            _determinationBuffer.rewind();
-
-            // TODO what if the selected transport has insufficient capacity?? 
Maybe use pour, and then try to finish pouring next time round.
-            _selectedTransportWrapper.tail().put(_determinationBuffer);
-            _selectedTransportWrapper.process();
-        } else if (_tail_closed) {
-            throw new TransportException("connection aborted");
-        }
-    }
-
-    @Override
-    public void close_tail()
-    {
-        try {
-            if (isDeterminationMade())
-            {
-                _selectedTransportWrapper.close_tail();
-            }
-        } finally {
-            _tail_closed = true;
-        }
-    }
-
-    @Override
-    public int pending()
-    {
-        if (_head_closed) { return Transport.END_OF_STREAM; }
-
-        if (isDeterminationMade()) {
-            return _selectedTransportWrapper.pending();
-        } else {
-            return 0;
-        }
-
-    }
 
-    @Override
-    public ByteBuffer head()
-    {
-        if (isDeterminationMade()) {
-            return _selectedTransportWrapper.head();
-        } else {
-            return null;
-        }
-    }
+/**
+ * SslHandshakeSniffingTransportWrapper
+ *
+ */
 
-    @Override
-    public void pop(int bytes)
-    {
-        if (isDeterminationMade()) {
-            _selectedTransportWrapper.pop(bytes);
-        } else if (bytes > 0) {
-            throw new IllegalStateException("no bytes have been read");
-        }
-    }
+public class SslHandshakeSniffingTransportWrapper extends 
HandshakeSniffingTransportWrapper<SslTransportWrapper, TransportWrapper>
+    implements SslTransportWrapper
+{
 
-    @Override
-    public void close_head()
-    {
-        if (isDeterminationMade()) {
-            _selectedTransportWrapper.close_head();
-        } else {
-            _head_closed = true;
-        }
+    SslHandshakeSniffingTransportWrapper(SslTransportWrapper ssl, 
TransportWrapper plain) {
+        super(ssl, plain);
     }
 
     @Override
@@ -171,7 +42,7 @@ public class SslHandshakeSniffingTransportWrapper implements 
SslTransportWrapper
     {
         if(isSecureWrapperSelected())
         {
-            return _secureTransportWrapper.getCipherName();
+            return _wrapper1.getCipherName();
         }
         else
         {
@@ -185,7 +56,7 @@ public class SslHandshakeSniffingTransportWrapper implements 
SslTransportWrapper
     {
         if (isSecureWrapperSelected())
         {
-            return _secureTransportWrapper.getProtocolName();
+            return _wrapper1.getProtocolName();
         }
         else
         {
@@ -195,32 +66,33 @@ public class SslHandshakeSniffingTransportWrapper 
implements SslTransportWrapper
 
     private boolean isSecureWrapperSelected()
     {
-        return _selectedTransportWrapper == _secureTransportWrapper;
+        return _selectedTransportWrapper == _wrapper1;
     }
 
-    private boolean isDeterminationMade()
-    {
-        return _selectedTransportWrapper != null;
+    protected int bufferSize() {
+        // minimum length for determination
+        return 5;
     }
 
-    private void makeSslDetermination(byte[] bytesInput)
+    protected void makeDetermination(byte[] bytesInput)
     {
         boolean isSecure = checkForSslHandshake(bytesInput);
         if (isSecure)
         {
-            _selectedTransportWrapper = _secureTransportWrapper;
+            _selectedTransportWrapper = _wrapper1;
         }
         else
         {
-            _selectedTransportWrapper = _plainTransportWrapper;
+            _selectedTransportWrapper = _wrapper2;
         }
     }
+
     // TODO perhaps the sniffer should save up the bytes from each
     // input call until it has sufficient bytes to make the determination
     // and only then pass them to the secure or plain wrapped transport?
     private boolean checkForSslHandshake(byte[] buf)
     {
-        if (buf.length >= MINIMUM_LENGTH_FOR_DETERMINATION)
+        if (buf.length >= bufferSize())
         {
             /*
              * SSLv2 Client Hello format

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/46edaebe/proton-j/src/main/resources/cengine.py
----------------------------------------------------------------------
diff --git a/proton-j/src/main/resources/cengine.py 
b/proton-j/src/main/resources/cengine.py
index 1e89220..c94d023 100644
--- a/proton-j/src/main/resources/cengine.py
+++ b/proton-j/src/main/resources/cengine.py
@@ -940,16 +940,22 @@ def pn_transport_capacity(trans):
   return trans.impl.capacity()
 
 def pn_transport_push(trans, input):
-  cap = pn_transport_capacity(trans)
-  if cap < 0:
-    return cap
-  elif len(input) > cap:
-    input = input[:cap]
-
-  bb = trans.impl.tail()
-  bb.put(array(input, 'b'))
-  trans.impl.process()
-  return len(input)
+  result = 0
+  while input:
+    cap = pn_transport_capacity(trans)
+    if cap < 0:
+      return cap
+    elif len(input) > cap:
+      trimmed = input[:cap]
+    else:
+      trimmed = input
+
+    bb = trans.impl.tail()
+    bb.put(array(trimmed, 'b'))
+    trans.impl.process()
+    input = input[cap:]
+    result += len(trimmed)
+  return result
 
 def pn_transport_close_head(trans):
   trans.impl.close_head()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to