This is an automated email from the ASF dual-hosted git repository. orudyy pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
commit 9b396b409899283cce14d6a2253c73865faededa Author: Alex Rudyy <oru...@apache.org> AuthorDate: Mon Jun 14 22:07:18 2021 +0100 QPID-8520: [Broker-J] Remove Qpid port-unification support for Jetty --- .../plugin/portunification/MarkableEndPoint.java | 275 --------------- .../TlsOrPlainConnectionFactory.java | 375 --------------------- .../TlsOrPlainConnectionFactoryTest.java | 196 ----------- 3 files changed, 846 deletions(-) diff --git a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/portunification/MarkableEndPoint.java b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/portunification/MarkableEndPoint.java deleted file mode 100644 index 925fc35..0000000 --- a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/portunification/MarkableEndPoint.java +++ /dev/null @@ -1,275 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.server.management.plugin.portunification; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.ReadPendingException; -import java.nio.channels.WritePendingException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import org.eclipse.jetty.io.Connection; -import org.eclipse.jetty.io.EndPoint; -import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.Callback; - -public class MarkableEndPoint implements EndPoint -{ - private final EndPoint _underlying; - private final List<ByteBuffer> _preserved = new ArrayList<>(); - private volatile boolean _marked; - - MarkableEndPoint(final EndPoint underlying) - { - _underlying = underlying; - } - - @Override - public InetSocketAddress getLocalAddress() - { - return _underlying.getLocalAddress(); - } - - @Override - public InetSocketAddress getRemoteAddress() - { - return _underlying.getRemoteAddress(); - } - - @Override - public boolean isOpen() - { - return _underlying.isOpen(); - } - - @Override - public long getCreatedTimeStamp() - { - return _underlying.getCreatedTimeStamp(); - } - - @Override - public void shutdownOutput() - { - _underlying.shutdownOutput(); - } - - @Override - public boolean isOutputShutdown() - { - return _underlying.isOutputShutdown(); - } - - @Override - public boolean isInputShutdown() - { - return _underlying.isInputShutdown(); - } - - @Override - public void close() - { - _underlying.close(); - } - - @Override - public boolean flush(final ByteBuffer... buffer) throws IOException - { - return _underlying.flush(buffer); - } - - @Override - public Object getTransport() - { - return _underlying.getTransport(); - } - - @Override - public long getIdleTimeout() - { - return _underlying.getIdleTimeout(); - } - - @Override - public void setIdleTimeout(final long idleTimeout) - { - _underlying.setIdleTimeout(idleTimeout); - } - - @Override - public void fillInterested(final Callback callback) throws ReadPendingException - { - _underlying.fillInterested(callback); - } - - @Override - public boolean tryFillInterested(final Callback callback) - { - return _underlying.tryFillInterested(callback); - } - - @Override - public boolean isFillInterested() - { - return _underlying.isFillInterested(); - } - - @Override - public void write(final Callback callback, final ByteBuffer... buffers) throws WritePendingException - { - _underlying.write(callback, buffers); - } - - @Override - public Connection getConnection() - { - return _underlying.getConnection(); - } - - @Override - public void setConnection(final Connection connection) - { - _underlying.setConnection(connection); - } - - @Override - public void onOpen() - { - _underlying.onOpen(); - } - - @Override - public void onClose() - { - _underlying.onClose(); - } - - @Override - public boolean isOptimizedForDirectBuffers() - { - return _underlying.isOptimizedForDirectBuffers(); - } - - @Override - public void upgrade(final Connection newConnection) - { - _underlying.upgrade(newConnection); - } - - @Override - public synchronized int fill(final ByteBuffer dst) throws IOException - { - if (_marked) - { - final int oldLimit = dst.limit(); - final int i = _underlying.fill(dst); - int newLimit = dst.limit(); - - ByteBuffer buf = preserve(dst.duplicate(), newLimit, oldLimit); - _preserved.add(buf); - return i; - } - else - { - int i = 0; - if (!_preserved.isEmpty()) - { - i += fillFromPreserved(dst); - if (!_preserved.isEmpty()) - { - return i; - } - } - i += _underlying.fill(dst); - - return i; - } - } - - synchronized void mark() - { - if (!_preserved.isEmpty()) - { - throw new IllegalStateException("Already marked"); - } - _marked = true; - } - - synchronized void rewind() - { - if (!_marked) - { - throw new IllegalStateException("Not marked"); - } - _marked = false; - } - - private int fillFromPreserved(final ByteBuffer dst) - { - int filled = 0; - final int pos = BufferUtil.flipToFill(dst); - try - { - Iterator<ByteBuffer> bufferIterator = _preserved.iterator(); - while (bufferIterator.hasNext()) - { - ByteBuffer buffer = bufferIterator.next(); - final int bufRemaining = buffer.remaining(); - int dstRemaining = dst.remaining(); - if (dstRemaining >= bufRemaining) - { - dst.put(buffer); - } - else - { - ByteBuffer slice = buffer.slice(); - slice.limit(dstRemaining); - dst.put(slice); - buffer.position(buffer.position() + dstRemaining); - } - filled += bufRemaining - buffer.remaining(); - if (buffer.hasRemaining()) - { - return filled; - } - bufferIterator.remove(); - } - } - finally - { - BufferUtil.flipToFlush(dst, pos); - } - return filled; - } - - private ByteBuffer preserve(final ByteBuffer dst, final int newLimit, final int oldLimit) - { - ByteBuffer buf = BufferUtil.allocate(newLimit - oldLimit); - ByteBuffer slice = dst.slice(); - slice.position(oldLimit); - slice.limit(newLimit); - BufferUtil.append(buf, slice); - return buf; - } -} diff --git a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/portunification/TlsOrPlainConnectionFactory.java b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/portunification/TlsOrPlainConnectionFactory.java deleted file mode 100644 index c3d0ac7..0000000 --- a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/portunification/TlsOrPlainConnectionFactory.java +++ /dev/null @@ -1,375 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.server.management.plugin.portunification; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLSession; - -import org.eclipse.jetty.io.AbstractConnection; -import org.eclipse.jetty.io.Connection; -import org.eclipse.jetty.io.EndPoint; -import org.eclipse.jetty.io.ssl.SslConnection; -import org.eclipse.jetty.io.ssl.SslHandshakeListener; -import org.eclipse.jetty.server.AbstractConnectionFactory; -import org.eclipse.jetty.server.ConnectionFactory; -import org.eclipse.jetty.server.Connector; -import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.annotation.Name; -import org.eclipse.jetty.util.component.ContainerLifeCycle; -import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.util.ssl.SslContextFactory; - -public class TlsOrPlainConnectionFactory extends AbstractConnectionFactory -{ - private final SslContextFactory _sslContextFactory; - private final String _nextProtocol; - - public TlsOrPlainConnectionFactory(@Name("sslContextFactory") final SslContextFactory factory, - @Name("nextProtocol") final String nextProtocol) - { - super("SSL"); - _sslContextFactory = factory == null ? new SslContextFactory.Server() : factory; - _nextProtocol = nextProtocol; - this.addBean(this._sslContextFactory); - } - - @Override - protected void doStart() throws Exception - { - super.doStart(); - - final SSLEngine engine = _sslContextFactory.newSSLEngine(); - engine.setUseClientMode(false); - final SSLSession session = engine.getSession(); - if (session.getPacketBufferSize() > this.getInputBufferSize()) - { - this.setInputBufferSize(session.getPacketBufferSize()); - } - engine.closeInbound(); - engine.closeOutbound(); - } - - @Override - public PlainOrTlsConnection newConnection(final Connector connector, final EndPoint realEndPoint) - { - final MarkableEndPoint endPoint = new MarkableEndPoint(realEndPoint); - final PlainOrTlsConnection plainOrTlsConnection = new PlainOrTlsConnection(connector, endPoint); - endPoint.setConnection(plainOrTlsConnection); - - return plainOrTlsConnection; - - } - - @Override - public String toString() - { - return String.format("%s@%x{%s->%s}", - this.getClass().getSimpleName(), - this.hashCode(), - this.getProtocol(), - this._nextProtocol); - } - - class PlainOrTlsConnection implements Connection - { - private final Logger LOG = Log.getLogger(PlainOrTlsConnection.class); - - private static final int TLS_HEADER_SIZE = 6; - - private final long _created = System.currentTimeMillis(); - private final Connector _connector; - private final MarkableEndPoint _endPoint; - private final Callback _fillableCallback = new Callback() - { - @Override - public void succeeded() - { - onFillable(); - } - }; - - private final ByteBuffer _tlsDeterminationBuf = BufferUtil.allocate(TLS_HEADER_SIZE); - private final List<Listener> _listeners = new CopyOnWriteArrayList<>(); - private volatile AbstractConnection _actualConnection; - - PlainOrTlsConnection(final Connector connector, final MarkableEndPoint endPoint) - { - _connector = connector; - _endPoint = endPoint; - _endPoint.mark(); - } - - @Override - public void addListener(Listener listener) - { - _listeners.add(listener); - AbstractConnection actualConnection = _actualConnection; - if (actualConnection != null) - { - actualConnection.addListener(listener); - } - } - - @Override - public void removeListener(Listener listener) - { - _listeners.remove(listener); - AbstractConnection actualConnection = _actualConnection; - if (actualConnection != null) - { - actualConnection.removeListener(listener); - } - } - - @Override - public void onOpen() - { - if (LOG.isDebugEnabled()) - { - LOG.debug("onOpen {}", this); - } - _endPoint.fillInterested(_fillableCallback); - - for (Listener listener : _listeners) - { - listener.onOpened(this); - } - - final AbstractConnection actualConnection = _actualConnection; - if (actualConnection != null) - { - actualConnection.onOpen(); - } - } - - @Override - public void onClose() - { - if (LOG.isDebugEnabled()) - { - LOG.debug("onClose {}", this); - } - - final AbstractConnection actualConnection = _actualConnection; - if (actualConnection != null) - { - actualConnection.onClose(); - } - - for (Listener listener : _listeners) - { - listener.onClosed(this); - } - } - - @Override - public EndPoint getEndPoint() - { - return _endPoint; - } - - @Override - public void close() - { - try - { - if (_endPoint != null) - { - _endPoint.close(); - } - } - finally - { - if (_actualConnection != null) - { - _actualConnection.close(); - } - } - } - - @Override - public boolean onIdleExpired() - { - return _actualConnection == null || _actualConnection.onIdleExpired(); - } - - @Override - public long getMessagesIn() - { - return _actualConnection == null ? -1L : _actualConnection.getMessagesIn(); - } - - @Override - public long getMessagesOut() - { - return _actualConnection == null ? -1L : _actualConnection.getMessagesOut(); - } - - @Override - public long getBytesIn() - { - return _actualConnection == null ? -1L : _actualConnection.getBytesIn(); - } - - @Override - public long getBytesOut() - { - return _actualConnection == null ? -1L : _actualConnection.getBytesOut(); - } - - @Override - public long getCreatedTimeStamp() - { - return _created; - } - - @Override - public final String toString() - { - return String.format("%s<-%s",toConnectionString(),getEndPoint()); - } - - String toConnectionString() - { - return String.format("%s@%h", - getClass().getSimpleName(), - this); - } - - void onFillable() - { - if (_actualConnection != null) - { - _actualConnection.onFillable(); - } - else - { - try - { - int filled = getEndPoint().fill(_tlsDeterminationBuf); - if (filled < 0) - { - close(); - return; - } - - int remaining = _tlsDeterminationBuf.remaining(); - if (remaining >= TLS_HEADER_SIZE) - { - final byte[] array = _tlsDeterminationBuf.array(); - - boolean isTLS = looksLikeSSLv2ClientHello(array) || looksLikeSSLv3ClientHello(array); - LOG.debug("new connection tls={} endpoint address={}", isTLS, getEndPoint().getRemoteAddress()); - - _endPoint.rewind(); - - if (isTLS) - { - final SSLEngine engine = _sslContextFactory.newSSLEngine(_endPoint.getRemoteAddress()); - engine.setUseClientMode(false); - - final SslConnection sslConnection = newSslConnection(_connector, _endPoint, engine); - sslConnection.setInputBufferSize(getInputBufferSize()); - sslConnection.setRenegotiationAllowed(_sslContextFactory.isRenegotiationAllowed()); - _actualConnection = sslConnection; - - if (_connector instanceof ContainerLifeCycle) - { - ContainerLifeCycle container = (ContainerLifeCycle)_connector; - container.getBeans(SslHandshakeListener.class).forEach(sslConnection::addHandshakeListener); - } - getBeans(SslHandshakeListener.class).forEach(sslConnection::addHandshakeListener); - - - ConnectionFactory next = _connector.getConnectionFactory(_nextProtocol); - - final EndPoint decryptedEndPoint = sslConnection.getDecryptedEndPoint(); - Connection connection = next.newConnection(_connector, decryptedEndPoint); - decryptedEndPoint.setConnection(connection); - } - else - { - final ConnectionFactory next = _connector.getConnectionFactory(_nextProtocol); - _actualConnection = (AbstractConnection) next.newConnection(_connector, _endPoint); - _endPoint.setConnection(_actualConnection); - } - - _actualConnection.onOpen(); - - for (Listener listener : _listeners) - { - _actualConnection.addListener(listener); - } - } - else - { - LOG.debug("Too few bytes to make determination received : {} required: {}", - remaining, TLS_HEADER_SIZE); - - _endPoint.fillInterested(_fillableCallback); - } - } - catch (IOException e) - { - close(); - } - } - } - - private boolean looksLikeSSLv3ClientHello(byte[] headerBytes) - { - return headerBytes[0] == 22 && // SSL Handshake - (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x - (headerBytes[2] == 0 || // SSL 3.0 - headerBytes[2] == 1 || // TLS 1.0 - headerBytes[2] == 2 || // TLS 1.1 - headerBytes[2] == 3)) && // TLS1.2 - (headerBytes[5] == 1); // client_hello - } - - private boolean looksLikeSSLv2ClientHello(byte[] headerBytes) - { - return headerBytes[0] == -128 && - headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x - (headerBytes[4] == 0 || // SSL 3.0 - headerBytes[4] == 1 || // TLS 1.0 - headerBytes[4] == 2 || // TLS 1.1 - headerBytes[4] == 3); - } - - private SslConnection newSslConnection(final Connector connector, final EndPoint endPoint, final SSLEngine engine) - { - final SslConnection sslConnection = - new SslConnection(connector.getByteBufferPool(), connector.getExecutor(), endPoint, engine); - TlsOrPlainConnectionFactory.this.configure(sslConnection, _connector, _endPoint); - return sslConnection; - } - - - } -} diff --git a/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/portunification/TlsOrPlainConnectionFactoryTest.java b/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/portunification/TlsOrPlainConnectionFactoryTest.java deleted file mode 100644 index 73ca922..0000000 --- a/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/portunification/TlsOrPlainConnectionFactoryTest.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.server.management.plugin.portunification; - - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLSession; - -import org.eclipse.jetty.io.AbstractConnection; -import org.eclipse.jetty.io.EndPoint; -import org.eclipse.jetty.server.ConnectionFactory; -import org.eclipse.jetty.server.Connector; -import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.ssl.SslContextFactory; -import org.junit.Before; -import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import org.apache.qpid.test.utils.UnitTestBase; - -public class TlsOrPlainConnectionFactoryTest extends UnitTestBase -{ - private SslContextFactory _sslContextFactory; - private Connector _connector; - private EndPoint _endPoint; - private AbstractConnection _actualConnection; - - private TlsOrPlainConnectionFactory _factory; - - @Before - public void setUp() throws Exception - { - - _sslContextFactory = mock(SslContextFactory.class); - SSLEngine sslEngine = mock(SSLEngine.class); - when(_sslContextFactory.newSSLEngine(any())).thenReturn(sslEngine); - final SSLSession sslSession = mock(SSLSession.class); - when(sslEngine.getSession()).thenReturn(sslSession); - when(sslSession.getPacketBufferSize()).thenReturn(Integer.MAX_VALUE); - - _factory = new TlsOrPlainConnectionFactory(_sslContextFactory, "test"); - - _actualConnection = mock(AbstractConnection.class); - - _connector = mock(Connector.class); - when(_connector.getExecutor()).thenReturn(mock(Executor.class)); - ConnectionFactory connectionFactory = mock(ConnectionFactory.class); - when(_connector.getConnectionFactory(anyString())).thenReturn(connectionFactory); - when(connectionFactory.newConnection(any(), any())).thenReturn(_actualConnection); - - _endPoint = mock(EndPoint.class); - } - - @Test - public void testOnFillableForTLS() throws Exception - { - AtomicBoolean firstPart = new AtomicBoolean(true); - Answer<Object> answer = (InvocationOnMock invocation) -> - { - ByteBuffer dst = - (ByteBuffer) invocation.getArguments()[0]; - if (firstPart.get()) - { - firstPart.set(false); - return writeBytes(dst, - (byte) 22, - (byte) 3, - (byte) 1); - } - return writeBytes(dst, - (byte) 0, - (byte) 0, - (byte) 1); - }; - when(_endPoint.fill(any(ByteBuffer.class))).thenAnswer(answer); - - TlsOrPlainConnectionFactory.PlainOrTlsConnection connection = _factory.newConnection(_connector, _endPoint); - - connection.onFillable(); - - verify(_endPoint).fillInterested(any(Callback.class)); - - connection.onFillable(); - - verify(_actualConnection).onOpen(); - verify(_sslContextFactory).newSSLEngine(any()); - - ByteBuffer buffer = BufferUtil.allocate(4); - int result = connection.getEndPoint().fill(buffer); - assertEquals((long) 4, (long) result); - - assertTrue(Arrays.equals(new byte[]{(byte) 22, (byte) 3, (byte) 1, (byte) 0}, buffer.array())); - buffer = BufferUtil.allocate(2); - - result = connection.getEndPoint().fill(buffer); - assertEquals((long) 2, (long) result); - assertTrue(Arrays.equals(new byte[]{(byte) 0, (byte) 1}, buffer.array())); - verify(_endPoint, times(3)).fill(any()); - } - - @Test - public void testOnFillableForPlain() throws Exception - { - AtomicBoolean firstPart = new AtomicBoolean(true); - Answer<Object> answer = (InvocationOnMock invocation) -> - { - ByteBuffer dst = - (ByteBuffer) invocation.getArguments()[0]; - if (firstPart.get()) - { - firstPart.set(false); - return writeBytes(dst, - "HTTP 1".getBytes()); - } - return writeBytes(dst, - ".1\n\n".getBytes()); - }; - when(_endPoint.fill(any(ByteBuffer.class))).thenAnswer(answer); - - TlsOrPlainConnectionFactory.PlainOrTlsConnection connection = _factory.newConnection(_connector, _endPoint); - - connection.onFillable(); - - verify(_actualConnection).onOpen(); - verify(_sslContextFactory, times(0)).newSSLEngine(any()); - verify(_endPoint).fill(any()); - - ByteBuffer buffer = BufferUtil.allocate(4); - int result = connection.getEndPoint().fill(buffer); - assertEquals((long) 4, (long) result); - - assertEquals("HTTP", new String(buffer.array())); - - buffer = BufferUtil.allocate(6); - result = connection.getEndPoint().fill(buffer); - assertEquals((long) 6, (long) result); - assertEquals(" 1.1\n\n", new String(buffer.array())); - verify(_endPoint, times(2)).fill(any()); - } - - private int writeBytes(ByteBuffer dst, byte... toWrite) - { - int written = 0; - if (BufferUtil.space(dst) > 0) - { - final int pos = BufferUtil.flipToFill(dst); - try - { - for (; written < toWrite.length; written++) - { - dst.put(toWrite[written]); - } - } - finally - { - BufferUtil.flipToFlush(dst, pos); - } - } - return written; - } -} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org