This is an automated email from the ASF dual-hosted git repository. burcham pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 9653a0b Revert "GEODE-8652: NioSslEngine.close() Bypasses Locks (#5666)" 9653a0b is described below commit 9653a0b6e490272fa77d375049f0e9f1cb6c8929 Author: Bill Burcham <bill.burc...@gmail.com> AuthorDate: Thu Nov 5 12:52:02 2020 -0800 Revert "GEODE-8652: NioSslEngine.close() Bypasses Locks (#5666)" This reverts commit 08e9e9673d0ed05555a3d74c6d16e706817cab09. --- .../tcp/ConnectionCloseSSLTLSDUnitTest.java | 238 ------------ .../org/apache/geode/internal/tcp/server.keystore | Bin 1256 -> 0 bytes ...LSocketHostNameVerificationIntegrationTest.java | 4 +- .../internal/net/SSLSocketIntegrationTest.java | 57 +-- .../apache/geode/codeAnalysis/excludedClasses.txt | 1 - .../geode/internal/net/ByteBufferSharing.java | 55 --- .../geode/internal/net/ByteBufferSharingImpl.java | 148 ------- .../geode/internal/net/ByteBufferSharingNoOp.java | 52 --- .../org/apache/geode/internal/net/NioFilter.java | 69 ++-- .../apache/geode/internal/net/NioPlainEngine.java | 27 +- .../apache/geode/internal/net/NioSslEngine.java | 367 +++++++++-------- .../org/apache/geode/internal/tcp/Connection.java | 34 +- .../org/apache/geode/internal/tcp/MsgReader.java | 15 +- .../internal/net/ByteBufferSharingImplTest.java | 163 -------- .../geode/internal/net/NioPlainEngineTest.java | 47 +-- .../geode/internal/net/NioSslEngineTest.java | 432 ++++++++++----------- 16 files changed, 486 insertions(+), 1223 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java deleted file mode 100644 index 77fe9bf..0000000 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java +++ /dev/null @@ -1,238 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.geode.internal.tcp; - -import static org.apache.geode.distributed.ConfigurationProperties.CONSERVE_SOCKETS; -import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION; -import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; -import static org.apache.geode.distributed.ConfigurationProperties.NAME; -import static org.apache.geode.distributed.ConfigurationProperties.SOCKET_BUFFER_SIZE; -import static org.apache.geode.distributed.ConfigurationProperties.SOCKET_LEASE_TIME; -import static org.apache.geode.distributed.ConfigurationProperties.SSL_ENABLED_COMPONENTS; -import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE; -import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE_PASSWORD; -import static org.apache.geode.distributed.ConfigurationProperties.SSL_PROTOCOLS; -import static org.apache.geode.distributed.ConfigurationProperties.SSL_REQUIRE_AUTHENTICATION; -import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE; -import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE_PASSWORD; -import static org.apache.geode.distributed.ConfigurationProperties.USE_CLUSTER_CONFIGURATION; -import static org.apache.geode.test.dunit.VM.getVM; -import static org.apache.geode.test.util.ResourceUtils.createTempFileFromResource; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.assertj.core.api.Fail.fail; - -import java.io.File; -import java.io.Serializable; -import java.nio.ByteBuffer; -import java.util.Properties; -import java.util.concurrent.TimeoutException; - -import org.apache.logging.log4j.Logger; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; - -import org.apache.geode.cache.Cache; -import org.apache.geode.cache.CacheFactory; -import org.apache.geode.cache.Region; -import org.apache.geode.cache.RegionShortcut; -import org.apache.geode.distributed.DistributedSystemDisconnectedException; -import org.apache.geode.distributed.Locator; -import org.apache.geode.distributed.internal.ClusterDistributionManager; -import org.apache.geode.distributed.internal.DistributionMessage; -import org.apache.geode.distributed.internal.DistributionMessageObserver; -import org.apache.geode.distributed.internal.membership.gms.membership.GMSJoinLeave; -import org.apache.geode.internal.cache.UpdateOperation.UpdateMessage; -import org.apache.geode.logging.internal.log4j.api.LogService; -import org.apache.geode.test.dunit.AsyncInvocation; -import org.apache.geode.test.dunit.VM; -import org.apache.geode.test.dunit.rules.DistributedBlackboard; -import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties; -import org.apache.geode.test.dunit.rules.DistributedRule; - -/** - * It would be nice if this test didn't need to use the cache since the test's purpose is to test - * that the {@link Connection} class can be closed while readers and writers hold locks on its - * internal TLS {@link ByteBuffer}s - * - * But this test does use the cache (region) because it enabled us to use existing cache messaging - * and to use the DistributionMessageObserver (observer) hooks. - * - * see also ClusterCommunicationsDUnitTest - */ -public class ConnectionCloseSSLTLSDUnitTest implements Serializable { - - private static final int SMALL_BUFFER_SIZE = 8000; - private static final String UPDATE_ENTERED_GATE = "connectionCloseDUnitTest.regionUpdateEntered"; - private static final String SUSPEND_UPDATE_GATE = "connectionCloseDUnitTest.suspendRegionUpdate"; - private static final String regionName = "connectionCloseDUnitTestRegion"; - private static final Logger logger = LogService.getLogger(); - - private static Cache cache; - - @Rule - public DistributedRule distributedRule = - DistributedRule.builder().withVMCount(3).build(); - - @Rule - public DistributedBlackboard blackboard = new DistributedBlackboard(); - - @Rule - public DistributedRestoreSystemProperties restoreSystemProperties = - new DistributedRestoreSystemProperties(); - - private VM locator; - private VM sender; - private VM receiver; - - @Before - public void before() { - locator = getVM(0); - sender = getVM(1); - receiver = getVM(2); - } - - @After - public void after() { - receiver.invoke(() -> { - DistributionMessageObserver.setInstance(null); - }); - } - - @Test - public void connectionWithHungReaderIsCloseableAndUnhangsReader() - throws InterruptedException, TimeoutException { - - blackboard.clearGate(UPDATE_ENTERED_GATE); - blackboard.clearGate(SUSPEND_UPDATE_GATE); - - final int locatorPort = createLocator(locator); - createCacheAndRegion(sender, locatorPort); - createCacheAndRegion(receiver, locatorPort); - - receiver - .invoke("set up DistributionMessageObserver to 'hang' sender's put (on receiver)", - () -> { - final DistributionMessageObserver observer = - new DistributionMessageObserver() { - - @Override - public void beforeProcessMessage(final ClusterDistributionManager dm, - final DistributionMessage message) { - guardMessageProcessingHook(message, () -> { - try { - blackboard.signalGate(UPDATE_ENTERED_GATE); - blackboard.waitForGate(SUSPEND_UPDATE_GATE); - } catch (TimeoutException | InterruptedException e) { - fail("message observus interruptus"); - } - logger.info("BGB: got before process message: " + message); - }); - } - }; - DistributionMessageObserver.setInstance(observer); - }); - - final AsyncInvocation<Object> putInvocation = sender.invokeAsync("try a put", () -> { - final Region<Object, Object> region = cache.getRegion(regionName); - // test is going to close the cache while we are waiting for our ack - assertThatThrownBy(() -> { - region.put("hello", "world"); - }).isInstanceOf(DistributedSystemDisconnectedException.class); - }); - - // wait until our message observer is blocked - blackboard.waitForGate(UPDATE_ENTERED_GATE); - - // at this point our put() is blocked waiting for a direct ack - assertThat(putInvocation.isAlive()).as("put is waiting for remote region to ack").isTrue(); - - /* - * Now close the cache. The point of calling it is to test that we don't block while trying - * to close connections. Cache.close() calls DistributedSystem.disconnect() which in turn - * closes all the connections (and their sockets.) We want the sockets to close because that'll - * cause our hung put() to see a DistributedSystemDisconnectedException. - */ - sender.invoke("", () -> cache.close()); - - // wait for put task to complete: with an exception, that is! - putInvocation.get(); - - // un-stick our message observer - blackboard.signalGate(SUSPEND_UPDATE_GATE); - } - - private void guardMessageProcessingHook(final DistributionMessage message, - final Runnable runnable) { - if (message instanceof UpdateMessage) { - final UpdateMessage updateMessage = (UpdateMessage) message; - if (updateMessage.getRegionPath().equals("/" + regionName)) { - runnable.run(); - } - } - } - - private int createLocator(VM memberVM) { - return memberVM.invoke("create locator", () -> { - // if you need to debug SSL communications use this property: - // System.setProperty("javax.net.debug", "all"); - System.setProperty(GMSJoinLeave.BYPASS_DISCOVERY_PROPERTY, "true"); - return Locator.startLocatorAndDS(0, new File(""), getDistributedSystemProperties()) - .getPort(); - }); - } - - private void createCacheAndRegion(VM memberVM, int locatorPort) { - memberVM.invoke("start cache and create region", () -> { - cache = createCache(locatorPort); - cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName); - }); - } - - private Cache createCache(int locatorPort) { - // if you need to debug SSL communications use this property: - // System.setProperty("javax.net.debug", "all"); - Properties properties = getDistributedSystemProperties(); - properties.setProperty(LOCATORS, "localhost[" + locatorPort + "]"); - return new CacheFactory(properties).create(); - } - - private Properties getDistributedSystemProperties() { - Properties properties = new Properties(); - properties.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false"); - properties.setProperty(USE_CLUSTER_CONFIGURATION, "false"); - properties.setProperty(NAME, "vm" + VM.getCurrentVMNum()); - properties.setProperty(CONSERVE_SOCKETS, "false"); // we are testing direct ack - properties.setProperty(SOCKET_LEASE_TIME, "10000"); - properties.setProperty(SOCKET_BUFFER_SIZE, "" + SMALL_BUFFER_SIZE); - - properties.setProperty(SSL_ENABLED_COMPONENTS, "cluster,locator"); - properties - .setProperty(SSL_KEYSTORE, createTempFileFromResource(getClass(), "server.keystore") - .getAbsolutePath()); - properties.setProperty(SSL_TRUSTSTORE, - createTempFileFromResource(getClass(), "server.keystore") - .getAbsolutePath()); - properties.setProperty(SSL_PROTOCOLS, "TLSv1.2"); - properties.setProperty(SSL_KEYSTORE_PASSWORD, "password"); - properties.setProperty(SSL_TRUSTSTORE_PASSWORD, "password"); - properties.setProperty(SSL_REQUIRE_AUTHENTICATION, "true"); - return properties; - } - -} diff --git a/geode-core/src/distributedTest/resources/org/apache/geode/internal/tcp/server.keystore b/geode-core/src/distributedTest/resources/org/apache/geode/internal/tcp/server.keystore deleted file mode 100644 index 8b5305f..0000000 Binary files a/geode-core/src/distributedTest/resources/org/apache/geode/internal/tcp/server.keystore and /dev/null differ diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java index a70f3b1..dc7df44 100755 --- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java @@ -215,9 +215,7 @@ public class SSLSocketHostNameVerificationIntegrationTest { final NioSslEngine nioSslEngine = engine; engine.close(socket.getChannel()); assertThatThrownBy(() -> { - try (final ByteBufferSharing unused = - nioSslEngine.unwrap(ByteBuffer.wrap(new byte[0]))) { - } + nioSslEngine.unwrap(ByteBuffer.wrap(new byte[0])); }) .isInstanceOf(IOException.class); } diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java index add6b9a..19eab4f 100755 --- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java @@ -256,13 +256,11 @@ public class SSLSocketIntegrationTest { ByteBuffer buffer = bbos.getContentBuffer(); System.out.println( "client buffer position is " + buffer.position() + " and limit is " + buffer.limit()); - try (final ByteBufferSharing outputSharing = engine.wrap(buffer)) { - ByteBuffer wrappedBuffer = outputSharing.getBuffer(); - System.out.println("client wrapped buffer position is " + wrappedBuffer.position() - + " and limit is " + wrappedBuffer.limit()); - int bytesWritten = clientChannel.write(wrappedBuffer); - System.out.println("client bytes written is " + bytesWritten); - } + ByteBuffer wrappedBuffer = engine.wrap(buffer); + System.out.println("client wrapped buffer position is " + wrappedBuffer.position() + + " and limit is " + wrappedBuffer.limit()); + int bytesWritten = clientChannel.write(wrappedBuffer); + System.out.println("client bytes written is " + bytesWritten); } private Thread startServerNIO(final ServerSocket serverSocket, int timeoutMillis) @@ -301,9 +299,7 @@ public class SSLSocketIntegrationTest { final NioSslEngine nioSslEngine = engine; engine.close(socket.getChannel()); assertThatThrownBy(() -> { - try (final ByteBufferSharing unused = - nioSslEngine.unwrap(ByteBuffer.wrap(new byte[0]))) { - } + nioSslEngine.unwrap(ByteBuffer.wrap(new byte[0])); }) .isInstanceOf(IOException.class); } @@ -317,35 +313,24 @@ public class SSLSocketIntegrationTest { private void readMessageFromNIOSSLClient(Socket socket, ByteBuffer buffer, NioSslEngine engine) throws IOException { - try (final ByteBufferSharing sharedBuffer = engine.getUnwrappedBuffer()) { - final ByteBuffer unwrapped = sharedBuffer.getBuffer(); - // if we already have unencrypted data skip unwrapping - if (unwrapped.position() == 0) { - int bytesRead; - // if we already have encrypted data skip reading from the socket - if (buffer.position() == 0) { - bytesRead = socket.getChannel().read(buffer); - buffer.flip(); - } else { - bytesRead = buffer.remaining(); - } - System.out.println("server bytes read is " + bytesRead + ": buffer position is " - + buffer.position() + " and limit is " + buffer.limit()); - try (final ByteBufferSharing sharedBuffer2 = engine.unwrap(buffer)) { - final ByteBuffer unwrapped2 = sharedBuffer2.getBuffer(); - - unwrapped2.flip(); - System.out.println("server unwrapped buffer position is " + unwrapped2.position() - + " and limit is " + unwrapped2.limit()); - finishReadMessageFromNIOSSLClient(unwrapped2); - } + ByteBuffer unwrapped = engine.getUnwrappedBuffer(buffer); + // if we already have unencrypted data skip unwrapping + if (unwrapped.position() == 0) { + int bytesRead; + // if we already have encrypted data skip reading from the socket + if (buffer.position() == 0) { + bytesRead = socket.getChannel().read(buffer); + buffer.flip(); } else { - finishReadMessageFromNIOSSLClient(unwrapped); + bytesRead = buffer.remaining(); } + System.out.println("server bytes read is " + bytesRead + ": buffer position is " + + buffer.position() + " and limit is " + buffer.limit()); + unwrapped = engine.unwrap(buffer); + unwrapped.flip(); + System.out.println("server unwrapped buffer position is " + unwrapped.position() + + " and limit is " + unwrapped.limit()); } - } - - private void finishReadMessageFromNIOSSLClient(final ByteBuffer unwrapped) throws IOException { ByteBufferInputStream bbis = new ByteBufferInputStream(unwrapped); DataInputStream dis = new DataInputStream(bbis); String welcome = dis.readUTF(); diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt index 33f43c3..a46d5fc 100644 --- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt +++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt @@ -104,4 +104,3 @@ org/apache/geode/cache/query/internal/xml/ElementType org/apache/geode/cache/query/internal/xml/ElementType$1 org/apache/geode/cache/query/internal/xml/ElementType$2 org/apache/geode/cache/query/internal/xml/ElementType$3 -org/apache/geode/internal/net/ByteBufferSharingImpl$OpenAttemptTimedOut \ No newline at end of file diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java deleted file mode 100644 index cdfa897..0000000 --- a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.geode.internal.net; - -import java.io.IOException; -import java.nio.ByteBuffer; - - -/** - * When a {@link ByteBufferSharing} is acquired in a try-with-resources the buffer is available (for - * reading and modification) within the scope of that try block. - * - * Releases managed ByteBuffer back to pool after last reference is dropped. - */ -public interface ByteBufferSharing extends AutoCloseable { - - /** - * Call this method only within a try-with-resource in which this {@link ByteBufferSharing} was - * acquired. Retain the reference only within the scope of that try-with-resources. - * - * @return the buffer: manipulable only within the scope of the try-with-resources - * @throws IOException if the buffer is no longer accessible - */ - ByteBuffer getBuffer() throws IOException; - - /** - * Expand the buffer if needed. This may return a different object so be sure to pay attention to - * the return value if you need access to the potentially- expanded buffer. - * - * Subsequent calls to {@link #getBuffer()} will return that new buffer too. - * - * @return the same buffer or a different (bigger) buffer - * @throws IOException if the buffer is no longer accessible - */ - ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException; - - /** - * Override {@link AutoCloseable#close()} without throws clause since we don't need one. - */ - @Override - void close(); -} diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingImpl.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingImpl.java deleted file mode 100644 index e9a941e..0000000 --- a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingImpl.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.geode.internal.net; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.geode.annotations.VisibleForTesting; -import org.apache.geode.internal.net.BufferPool.BufferType; - -/** - * An {@link AutoCloseable} meant to be acquired in a try-with-resources statement. The resource (a - * {@link ByteBuffer}) is available (for reading and modification) in the scope of the - * try-with-resources. - */ -class ByteBufferSharingImpl implements ByteBufferSharing { - - static class OpenAttemptTimedOut extends Exception { - } - - private final Lock lock; - private final AtomicBoolean isClosed; - // mutable because in general our ByteBuffer may need to be resized (grown or compacted) - private ByteBuffer buffer; - private final BufferType bufferType; - private final AtomicInteger counter; - private final BufferPool bufferPool; - - /** - * This constructor is for use only by the owner of the shared resource (a {@link ByteBuffer}). - * - * A resource owner must invoke {@link #open()} once for each reference that escapes (is passed - * to an external object or is returned to an external caller.) - * - * This constructor acquires no lock. The reference count will be 1 after this constructor - * completes. - */ - ByteBufferSharingImpl(final ByteBuffer buffer, final BufferType bufferType, - final BufferPool bufferPool) { - this.buffer = buffer; - this.bufferType = bufferType; - this.bufferPool = bufferPool; - lock = new ReentrantLock(); - counter = new AtomicInteger(1); - isClosed = new AtomicBoolean(false); - } - - /** - * The destructor. Called by the resource owner to undo the work of the constructor. - */ - void destruct() { - if (isClosed.compareAndSet(false, true)) { - dropReference(); - } - } - - /** - * This method is for use only by the owner of the shared resource. It's used for handing out - * references to the shared resource. So it does reference counting and also acquires a lock. - * - * Resource owners call this method as the last thing before returning a reference to the caller. - * That caller binds that reference to a variable in a try-with-resources statement and relies on - * the AutoCloseable protocol to invoke {@link #close()} on the object at the end of the block. - */ - ByteBufferSharing open() { - lock.lock(); - addReference(); - return this; - } - - /** - * This variant throws {@link OpenAttemptTimedOut} if it can't acquire the lock in time. - */ - ByteBufferSharing open(final long time, final TimeUnit unit) throws OpenAttemptTimedOut { - try { - if (!lock.tryLock(time, unit)) { - throw new OpenAttemptTimedOut(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new OpenAttemptTimedOut(); - } - addReference(); - return this; - } - - @Override - public ByteBuffer getBuffer() throws IOException { - if (isClosed.get()) { - throw new IOException("NioSslEngine has been closed"); - } else { - return buffer; - } - } - - @Override - public ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException { - return buffer = bufferPool.expandWriteBufferIfNeeded(bufferType, getBuffer(), newCapacity); - } - - @Override - public void close() { - /* - * We are counting on our ReentrantLock throwing an exception if the current thread - * does not hold the lock. In that case dropReference() will not be called. This - * prevents ill-behaved clients (clients that call close() too many times) from - * corrupting our reference count. - */ - lock.unlock(); - dropReference(); - } - - private int addReference() { - return counter.incrementAndGet(); - } - - private int dropReference() { - final int usages = counter.decrementAndGet(); - if (usages == 0) { - bufferPool.releaseBuffer(bufferType, buffer); - } - return usages; - } - - @VisibleForTesting - public void setBufferForTestingOnly(final ByteBuffer newBufferForTesting) { - buffer = newBufferForTesting; - } - -} diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java deleted file mode 100644 index bd707e3..0000000 --- a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.geode.internal.net; - -import java.io.IOException; -import java.nio.ByteBuffer; - -/** - * An {@link AutoCloseable} meant to be acquired in a try-with-resources statement. The resource (a - * {@link ByteBuffer}) is available (for reading and modification) in the scope of the - * try-with-resources. - * - * This implementation is a "no-op". It performs no actual locking and no reference counting. It's - * meant for use with the {@link NioPlainEngine} only, since that engine keeps no buffers and so, - * needs no reference counting on buffers, nor any synchronization around access to buffers. - * - * See also {@link ByteBufferSharingImpl} - */ -class ByteBufferSharingNoOp implements ByteBufferSharing { - - private final ByteBuffer buffer; - - ByteBufferSharingNoOp(final ByteBuffer buffer) { - this.buffer = buffer; - } - - @Override - public ByteBuffer getBuffer() { - return buffer; - } - - @Override - public ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException { - throw new UnsupportedOperationException("Can't expand buffer when using NioPlainEngine"); - } - - @Override - public void close() {} -} diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java index eb53f0e..9c437ad 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java +++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java @@ -19,53 +19,47 @@ import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; /** - * Prior to transmitting a buffer or processing a received buffer a NioFilter should be called to - * wrap (transmit) or unwrap (received) the buffer in case SSL is being used.<br> - * Implementations of - * this class may not be thread-safe in regard to the buffers their methods return. These may be - * internal state that, if used concurrently by multiple threads could cause corruption. Appropriate - * external synchronization must be used in order to provide thread-safety. Do this by invoking - * getSynchObject() and synchronizing on the returned object while using the buffer. + * Prior to transmitting a buffer or processing a received buffer + * a NioFilter should be called to wrap (transmit) or unwrap (received) + * the buffer in case SSL is being used.<br> + * Implementations of this class may not be thread-safe in regard to + * the buffers their methods return. These may be internal state that, + * if used concurrently by multiple threads could cause corruption. + * Appropriate external synchronization must be used in order to provide + * thread-safety. Do this by invoking getSynchObject() and synchronizing on + * the returned object while using the buffer. */ public interface NioFilter { /** * wrap bytes for transmission to another process - * - * Be sure to call close() on the returned {@link ByteBufferSharing}. The best way to do that is - * to call this method in a try-with-resources statement. */ - ByteBufferSharing wrap(ByteBuffer buffer) throws IOException; + ByteBuffer wrap(ByteBuffer buffer) throws IOException; /** - * unwrap bytes received from another process. The unwrapped buffer should be flipped before - * reading. When done reading invoke doneReading() to reset for future read ops - * - * Be sure to call close() on the returned {@link ByteBufferSharing}. The best way to do that is - * to call this method in a try-with-resources statement. + * unwrap bytes received from another process. The unwrapped + * buffer should be flipped before reading. When done reading invoke + * doneReading() to reset for future read ops */ - ByteBufferSharing unwrap(ByteBuffer wrappedBuffer) throws IOException; + ByteBuffer unwrap(ByteBuffer wrappedBuffer) throws IOException; /** - * ensure that the wrapped buffer has enough room to read the given amount of data. This must be - * invoked before readAtLeast. A new buffer may be returned by this method. + * ensure that the wrapped buffer has enough room to read the given amount of data. + * This must be invoked before readAtLeast. A new buffer may be returned by this method. */ ByteBuffer ensureWrappedCapacity(int amount, ByteBuffer wrappedBuffer, BufferPool.BufferType bufferType); /** - * read at least the indicated amount of bytes from the given socket. The buffer position will be - * ready for reading the data when this method returns. Note: you must invoke - * ensureWrappedCapacity with the given amount prior to each invocation of this method. + * read at least the indicated amount of bytes from the given + * socket. The buffer position will be ready for reading + * the data when this method returns. Note: you must invoke ensureWrappedCapacity + * with the given amount prior to each invocation of this method. * <br> * wrappedBuffer = filter.ensureWrappedCapacity(amount, wrappedBuffer, etc.);<br> - * unwrappedBuffer - * = filter.readAtLeast(channel, amount, wrappedBuffer, etc.) - * - * Be sure to call close() on the returned {@link ByteBufferSharing}. The best way to do that is - * to call this method in a try-with-resources statement. + * unwrappedBuffer = filter.readAtLeast(channel, amount, wrappedBuffer, etc.) */ - ByteBufferSharing readAtLeast(SocketChannel channel, int amount, ByteBuffer wrappedBuffer) + ByteBuffer readAtLeast(SocketChannel channel, int amount, ByteBuffer wrappedBuffer) throws IOException; /** @@ -87,19 +81,28 @@ public interface NioFilter { } } + default boolean isClosed() { + return false; + } + /** * invoke this method when you are done using the NioFilter + * */ default void close(SocketChannel socketChannel) { // nothing by default } /** - * Returns the sharing object for the {@link NioFilter}'s unwrapped buffer, if one exists. - * - * Be sure to call close() on the returned {@link ByteBufferSharing}. The best way to do that is - * to call this method in a try-with-resources statement. + * returns the unwrapped byte buffer associated with the given wrapped buffer. */ - ByteBufferSharing getUnwrappedBuffer(); + ByteBuffer getUnwrappedBuffer(ByteBuffer wrappedBuffer); + /** + * returns an object to be used in synchronizing on the use of buffers returned by + * a NioFilter. + */ + default Object getSynchObject() { + return this; + } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java index 8b5df96..3ebce38 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java +++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; -import org.apache.geode.annotations.internal.MakeImmutable; import org.apache.geode.internal.Assert; /** @@ -28,12 +27,6 @@ import org.apache.geode.internal.Assert; * secure communications. */ public class NioPlainEngine implements NioFilter { - - // this variable requires the MakeImmutable annotation but the buffer is empty and - // not really modifiable - @MakeImmutable - private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0); - private final BufferPool bufferPool; int lastReadPosition; @@ -45,14 +38,14 @@ public class NioPlainEngine implements NioFilter { } @Override - public ByteBufferSharing wrap(ByteBuffer buffer) { - return shareBuffer(buffer); + public ByteBuffer wrap(ByteBuffer buffer) { + return buffer; } @Override - public ByteBufferSharing unwrap(ByteBuffer wrappedBuffer) { + public ByteBuffer unwrap(ByteBuffer wrappedBuffer) { wrappedBuffer.position(wrappedBuffer.limit()); - return shareBuffer(wrappedBuffer); + return wrappedBuffer; } @Override @@ -89,7 +82,7 @@ public class NioPlainEngine implements NioFilter { } @Override - public ByteBufferSharing readAtLeast(SocketChannel channel, int bytes, ByteBuffer wrappedBuffer) + public ByteBuffer readAtLeast(SocketChannel channel, int bytes, ByteBuffer wrappedBuffer) throws IOException { ByteBuffer buffer = wrappedBuffer; @@ -115,7 +108,7 @@ public class NioPlainEngine implements NioFilter { buffer.position(lastProcessedPosition); lastProcessedPosition += bytes; - return shareBuffer(buffer); + return buffer; } public void doneReading(ByteBuffer unwrappedBuffer) { @@ -128,12 +121,8 @@ public class NioPlainEngine implements NioFilter { } @Override - public ByteBufferSharing getUnwrappedBuffer() { - return shareBuffer(EMPTY_BUFFER); - } - - private ByteBufferSharingNoOp shareBuffer(final ByteBuffer wrappedBuffer) { - return new ByteBufferSharingNoOp(wrappedBuffer); + public ByteBuffer getUnwrappedBuffer(ByteBuffer wrappedBuffer) { + return wrappedBuffer; } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java index d2415e1..bacd538 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java +++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java @@ -40,19 +40,24 @@ import javax.net.ssl.SSLSession; import org.apache.logging.log4j.Logger; import org.apache.geode.GemFireIOException; -import org.apache.geode.annotations.VisibleForTesting; +import org.apache.geode.annotations.internal.MakeImmutable; import org.apache.geode.internal.net.BufferPool.BufferType; -import org.apache.geode.internal.net.ByteBufferSharingImpl.OpenAttemptTimedOut; import org.apache.geode.logging.internal.log4j.api.LogService; /** - * NioSslEngine uses an SSLEngine to bind SSL logic to a data source. This class is not thread safe. - * Its use should be confined to one thread or should be protected by external synchronization. + * NioSslEngine uses an SSLEngine to bind SSL logic to a data source. This class is not thread + * safe. Its use should be confined to one thread or should be protected by external + * synchronization. */ public class NioSslEngine implements NioFilter { private static final Logger logger = LogService.getLogger(); + // this variable requires the MakeImmutable annotation but the buffer is empty and + // not really modifiable + @MakeImmutable + private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0); + private final BufferPool bufferPool; private boolean closed; @@ -60,28 +65,23 @@ public class NioSslEngine implements NioFilter { SSLEngine engine; /** - * holds bytes wrapped by the SSLEngine; a.k.a. myNetData + * myNetData holds bytes wrapped by the SSLEngine */ - private final ByteBufferSharingImpl outputSharing; + ByteBuffer myNetData; /** - * holds the last unwrapped data from a peer; a.k.a. peerAppData + * peerAppData holds the last unwrapped data from a peer */ - private final ByteBufferSharingImpl inputSharing; + ByteBuffer peerAppData; NioSslEngine(SSLEngine engine, BufferPool bufferPool) { SSLSession session = engine.getSession(); int appBufferSize = session.getApplicationBufferSize(); int packetBufferSize = engine.getSession().getPacketBufferSize(); - closed = false; this.engine = engine; this.bufferPool = bufferPool; - outputSharing = - new ByteBufferSharingImpl(bufferPool.acquireDirectSenderBuffer(packetBufferSize), - TRACKED_SENDER, bufferPool); - inputSharing = - new ByteBufferSharingImpl(bufferPool.acquireNonDirectReceiveBuffer(appBufferSize), - TRACKED_RECEIVER, bufferPool); + this.myNetData = bufferPool.acquireDirectSenderBuffer(packetBufferSize); + this.peerAppData = bufferPool.acquireNonDirectReceiveBuffer(appBufferSize); } /** @@ -135,65 +135,57 @@ public class NioSslEngine implements NioFilter { switch (status) { case NEED_UNWRAP: - try (final ByteBufferSharing inputSharing = shareInputBuffer()) { - final ByteBuffer peerAppData = inputSharing.getBuffer(); - - // Receive handshaking data from peer - int dataRead = socketChannel.read(handshakeBuffer); - - // Process incoming handshaking data - handshakeBuffer.flip(); - - - engineResult = engine.unwrap(handshakeBuffer, peerAppData); - handshakeBuffer.compact(); - status = engineResult.getHandshakeStatus(); - - // if we're not finished, there's nothing to process and no data was read let's hang out - // for a little - if (peerAppData.remaining() == 0 && dataRead == 0 && status == NEED_UNWRAP) { - Thread.sleep(10); - } + // Receive handshaking data from peer + int dataRead = socketChannel.read(handshakeBuffer); + + // Process incoming handshaking data + handshakeBuffer.flip(); + engineResult = engine.unwrap(handshakeBuffer, peerAppData); + handshakeBuffer.compact(); + status = engineResult.getHandshakeStatus(); + + // if we're not finished, there's nothing to process and no data was read let's hang out + // for a little + if (peerAppData.remaining() == 0 && dataRead == 0 && status == NEED_UNWRAP) { + Thread.sleep(10); + } - if (engineResult.getStatus() == BUFFER_OVERFLOW) { - inputSharing.expandWriteBufferIfNeeded(peerAppData.capacity() * 2); - } - break; + if (engineResult.getStatus() == BUFFER_OVERFLOW) { + peerAppData = + expandWriteBuffer(TRACKED_RECEIVER, peerAppData, peerAppData.capacity() * 2); } + break; case NEED_WRAP: - try (final ByteBufferSharing outputSharing = shareOutputBuffer()) { - final ByteBuffer myNetData = outputSharing.getBuffer(); - - // Empty the local network packet buffer. - myNetData.clear(); - - // Generate handshaking data - engineResult = engine.wrap(myAppData, myNetData); - status = engineResult.getHandshakeStatus(); - - // Check status - switch (engineResult.getStatus()) { - case BUFFER_OVERFLOW: - // no need to assign return value because we will never reference it - outputSharing.expandWriteBufferIfNeeded(myNetData.capacity() * 2); - break; - case OK: - myNetData.flip(); - // Send the handshaking data to peer - while (myNetData.hasRemaining()) { - socketChannel.write(myNetData); - } - break; - case CLOSED: - break; - default: - logger.info("handshake terminated with illegal state due to {}", status); - throw new IllegalStateException( - "Unknown SSLEngineResult status: " + engineResult.getStatus()); - } - break; + // Empty the local network packet buffer. + myNetData.clear(); + + // Generate handshaking data + engineResult = engine.wrap(myAppData, myNetData); + status = engineResult.getHandshakeStatus(); + + // Check status + switch (engineResult.getStatus()) { + case BUFFER_OVERFLOW: + myNetData = + expandWriteBuffer(TRACKED_SENDER, myNetData, + myNetData.capacity() * 2); + break; + case OK: + myNetData.flip(); + // Send the handshaking data to peer + while (myNetData.hasRemaining()) { + socketChannel.write(myNetData); + } + break; + case CLOSED: + break; + default: + logger.info("handshake terminated with illegal state due to {}", status); + throw new IllegalStateException( + "Unknown SSLEngineResult status: " + engineResult.getStatus()); } + break; case NEED_TASK: // Handle blocking tasks handleBlockingTasks(); @@ -221,6 +213,17 @@ public class NioSslEngine implements NioFilter { return true; } + ByteBuffer expandWriteBuffer(BufferType type, ByteBuffer existing, + int desiredCapacity) { + return bufferPool.expandWriteBufferIfNeeded(type, existing, desiredCapacity); + } + + synchronized void checkClosed() throws IOException { + if (closed) { + throw new IOException("NioSslEngine has been closed"); + } + } + void handleBlockingTasks() { Runnable task; while ((task = engine.getDelegatedTask()) != null) { @@ -230,84 +233,79 @@ public class NioSslEngine implements NioFilter { } @Override - public ByteBufferSharing wrap(ByteBuffer appData) throws IOException { - try (final ByteBufferSharing outputSharing = shareOutputBuffer()) { - - ByteBuffer myNetData = outputSharing.getBuffer(); + public synchronized ByteBuffer wrap(ByteBuffer appData) throws IOException { + checkClosed(); - myNetData.clear(); + myNetData.clear(); - while (appData.hasRemaining()) { - // ensure we have lots of capacity since encrypted data might - // be larger than the app data - int remaining = myNetData.capacity() - myNetData.position(); + while (appData.hasRemaining()) { + // ensure we have lots of capacity since encrypted data might + // be larger than the app data + int remaining = myNetData.capacity() - myNetData.position(); - if (remaining < (appData.remaining() * 2)) { - int newCapacity = expandedCapacity(appData, myNetData); - myNetData = outputSharing.expandWriteBufferIfNeeded(newCapacity); - } + if (remaining < (appData.remaining() * 2)) { + int newCapacity = expandedCapacity(appData, myNetData); + myNetData = expandWriteBuffer(TRACKED_SENDER, myNetData, newCapacity); + } - SSLEngineResult wrapResult = engine.wrap(appData, myNetData); + SSLEngineResult wrapResult = engine.wrap(appData, myNetData); - if (wrapResult.getHandshakeStatus() == NEED_TASK) { - handleBlockingTasks(); - } + if (wrapResult.getHandshakeStatus() == NEED_TASK) { + handleBlockingTasks(); + } - if (wrapResult.getStatus() != OK) { - throw new SSLException("Error encrypting data: " + wrapResult); - } + if (wrapResult.getStatus() != OK) { + throw new SSLException("Error encrypting data: " + wrapResult); } + } - myNetData.flip(); + myNetData.flip(); - return shareOutputBuffer(); - } + return myNetData; } @Override - public ByteBufferSharing unwrap(ByteBuffer wrappedBuffer) throws IOException { - try (final ByteBufferSharing inputSharing = shareInputBuffer()) { - - ByteBuffer peerAppData = inputSharing.getBuffer(); - - // note that we do not clear peerAppData as it may hold a partial - // message. TcpConduit, for instance, uses message chunking to - // transmit large payloads and we may have read a partial chunk - // during the previous unwrap - - peerAppData.limit(peerAppData.capacity()); - boolean stopDecryption = false; - while (wrappedBuffer.hasRemaining() && !stopDecryption) { - SSLEngineResult unwrapResult = engine.unwrap(wrappedBuffer, peerAppData); - switch (unwrapResult.getStatus()) { - case BUFFER_OVERFLOW: - // buffer overflow expand and try again - double the available decryption space - int newCapacity = - (peerAppData.capacity() - peerAppData.position()) * 2 + peerAppData.position(); - newCapacity = Math.max(newCapacity, peerAppData.capacity() / 2 * 3); - peerAppData = inputSharing.expandWriteBufferIfNeeded(newCapacity); - peerAppData.limit(peerAppData.capacity()); - break; - case BUFFER_UNDERFLOW: - // partial data - need to read more. When this happens the SSLEngine will not have - // changed the buffer position - wrappedBuffer.compact(); - return shareInputBuffer(); - case OK: - break; - default: - // if there is data in the decrypted buffer return it. Otherwise signal that we're - // having trouble - if (peerAppData.position() <= 0) { - throw new SSLException("Error decrypting data: " + unwrapResult); - } - stopDecryption = true; - break; - } + public synchronized ByteBuffer unwrap(ByteBuffer wrappedBuffer) throws IOException { + checkClosed(); + + // note that we do not clear peerAppData as it may hold a partial + // message. TcpConduit, for instance, uses message chunking to + // transmit large payloads and we may have read a partial chunk + // during the previous unwrap + + peerAppData.limit(peerAppData.capacity()); + boolean stopDecryption = false; + while (wrappedBuffer.hasRemaining() && !stopDecryption) { + SSLEngineResult unwrapResult = engine.unwrap(wrappedBuffer, peerAppData); + switch (unwrapResult.getStatus()) { + case BUFFER_OVERFLOW: + // buffer overflow expand and try again - double the available decryption space + int newCapacity = + (peerAppData.capacity() - peerAppData.position()) * 2 + peerAppData.position(); + newCapacity = Math.max(newCapacity, peerAppData.capacity() / 2 * 3); + peerAppData = + bufferPool.expandWriteBufferIfNeeded(TRACKED_RECEIVER, peerAppData, newCapacity); + peerAppData.limit(peerAppData.capacity()); + break; + case BUFFER_UNDERFLOW: + // partial data - need to read more. When this happens the SSLEngine will not have + // changed the buffer position + wrappedBuffer.compact(); + return peerAppData; + case OK: + break; + default: + // if there is data in the decrypted buffer return it. Otherwise signal that we're + // having trouble + if (peerAppData.position() <= 0) { + throw new SSLException("Error decrypting data: " + unwrapResult); + } + stopDecryption = true; + break; } - wrappedBuffer.clear(); - return shareInputBuffer(); } + wrappedBuffer.clear(); + return peerAppData; } @Override @@ -324,45 +322,50 @@ public class NioSslEngine implements NioFilter { } @Override - public ByteBufferSharing readAtLeast(SocketChannel channel, int bytes, + public ByteBuffer readAtLeast(SocketChannel channel, int bytes, ByteBuffer wrappedBuffer) throws IOException { - try (final ByteBufferSharing inputSharing = shareInputBuffer()) { - - ByteBuffer peerAppData = inputSharing.getBuffer(); - - if (peerAppData.capacity() > bytes) { - // we already have a buffer that's big enough - if (peerAppData.capacity() - peerAppData.position() < bytes) { - peerAppData.compact(); - peerAppData.flip(); - } + if (peerAppData.capacity() > bytes) { + // we already have a buffer that's big enough + if (peerAppData.capacity() - peerAppData.position() < bytes) { + peerAppData.compact(); + peerAppData.flip(); } + } - while (peerAppData.remaining() < bytes) { - wrappedBuffer.limit(wrappedBuffer.capacity()); - int amountRead = channel.read(wrappedBuffer); - if (amountRead < 0) { - throw new EOFException(); - } - if (amountRead > 0) { - wrappedBuffer.flip(); - // prep the decoded buffer for writing - peerAppData.compact(); - try (final ByteBufferSharing inputSharing2 = unwrap(wrappedBuffer)) { - // done writing to the decoded buffer - prep it for reading again - final ByteBuffer peerAppDataNew = inputSharing2.getBuffer(); - peerAppDataNew.flip(); - peerAppData = peerAppDataNew; // loop needs new reference! - } - } + while (peerAppData.remaining() < bytes) { + wrappedBuffer.limit(wrappedBuffer.capacity()); + int amountRead = channel.read(wrappedBuffer); + if (amountRead < 0) { + throw new EOFException(); + } + if (amountRead > 0) { + wrappedBuffer.flip(); + // prep the decoded buffer for writing + peerAppData.compact(); + peerAppData = unwrap(wrappedBuffer); + // done writing to the decoded buffer - prep it for reading again + peerAppData.flip(); } - return shareInputBuffer(); } + return peerAppData; } @Override - public ByteBufferSharing getUnwrappedBuffer() { - return shareInputBuffer(); + public ByteBuffer getUnwrappedBuffer(ByteBuffer wrappedBuffer) { + return peerAppData; + } + + /** + * ensures that the unwrapped buffer associated with the given wrapped buffer has + * sufficient capacity for the given amount of bytes. This may compact the + * buffer or it may return a new buffer. + */ + public ByteBuffer ensureUnwrappedCapacity(int amount) { + // for TTLS the app-data buffers do not need to be tracked direct-buffers since we + // do not use them for I/O operations + peerAppData = + bufferPool.expandReadBufferIfNeeded(TRACKED_RECEIVER, peerAppData, amount); + return peerAppData; } @Override @@ -373,14 +376,16 @@ public class NioSslEngine implements NioFilter { } @Override + public synchronized boolean isClosed() { + return closed; + } + + @Override public synchronized void close(SocketChannel socketChannel) { if (closed) { return; } - closed = true; - inputSharing.destruct(); - try (final ByteBufferSharing outputSharing = shareOutputBuffer(1, TimeUnit.MINUTES)) { - final ByteBuffer myNetData = outputSharing.getBuffer(); + try { if (!engine.isOutboundDone()) { ByteBuffer empty = ByteBuffer.wrap(new byte[0]); @@ -407,13 +412,14 @@ public class NioSslEngine implements NioFilter { // we can't send a close message if the channel is closed } catch (IOException e) { throw new GemFireIOException("exception closing SSL session", e); - } catch (final OpenAttemptTimedOut _unused) { - logger.info(String.format("Couldn't get output lock in time, eliding TLS close message")); - if (!engine.isOutboundDone()) { - engine.closeOutbound(); - } } finally { - outputSharing.destruct(); + ByteBuffer netData = myNetData; + ByteBuffer appData = peerAppData; + myNetData = null; + peerAppData = EMPTY_BUFFER; + bufferPool.releaseBuffer(TRACKED_SENDER, netData); + bufferPool.releaseBuffer(TRACKED_RECEIVER, appData); + this.closed = true; } } @@ -422,17 +428,4 @@ public class NioSslEngine implements NioFilter { targetBuffer.capacity() * 2); } - @VisibleForTesting - public ByteBufferSharing shareOutputBuffer() { - return outputSharing.open(); - } - - private ByteBufferSharing shareOutputBuffer(final long time, final TimeUnit unit) - throws OpenAttemptTimedOut { - return outputSharing.open(time, unit); - } - - public ByteBufferSharing shareInputBuffer() { - return inputSharing.open(); - } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java index 844ab11..29d15e3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java @@ -78,7 +78,6 @@ import org.apache.geode.internal.InternalDataSerializer; import org.apache.geode.internal.SystemTimer; import org.apache.geode.internal.SystemTimer.SystemTimerTask; import org.apache.geode.internal.net.BufferPool; -import org.apache.geode.internal.net.ByteBufferSharing; import org.apache.geode.internal.net.NioFilter; import org.apache.geode.internal.net.NioPlainEngine; import org.apache.geode.internal.net.SocketCreator; @@ -803,12 +802,11 @@ public class Connection implements Runnable { @VisibleForTesting void clearSSLInputBuffer() { if (getConduit().useSSL() && ioFilter != null) { - try (final ByteBufferSharing sharedBuffer = ioFilter.getUnwrappedBuffer()) { - // clear out any remaining handshake bytes - try { - sharedBuffer.getBuffer().position(0).limit(0); - } catch (IOException e) { - // means the NioFilter was already closed + synchronized (ioFilter.getSynchObject()) { + if (!ioFilter.isClosed()) { + // clear out any remaining handshake bytes + ByteBuffer buffer = ioFilter.getUnwrappedBuffer(inputBuffer); + buffer.position(0).limit(0); } } } @@ -2455,9 +2453,8 @@ public class Connection implements Runnable { long queueTimeoutTarget = now + asyncQueueTimeout; channel.configureBlocking(false); try { - try (final ByteBufferSharing outputSharing = ioFilter.wrap(buffer)) { - final ByteBuffer wrappedBuffer = outputSharing.getBuffer(); - + synchronized (ioFilter.getSynchObject()) { + ByteBuffer wrappedBuffer = ioFilter.wrap(buffer); int waitTime = 1; do { owner.getConduit().getCancelCriterion().checkCancelInProgress(null); @@ -2610,9 +2607,9 @@ public class Connection implements Runnable { } // fall through } - try (final ByteBufferSharing outputSharing = ioFilter.wrap(buffer)) { - final ByteBuffer wrappedBuffer = outputSharing.getBuffer(); - + // synchronize on the ioFilter while using its network buffer + synchronized (ioFilter.getSynchObject()) { + ByteBuffer wrappedBuffer = ioFilter.wrap(buffer); while (wrappedBuffer.remaining() > 0) { int amtWritten = 0; long start = stats.startSocketWrite(true); @@ -2664,12 +2661,10 @@ public class Connection implements Runnable { final KnownVersion version = getRemoteVersion(); try { msgReader = new MsgReader(this, ioFilter, version); - ReplyMessage msg; int len; - // (we have to lock here to protect between reading header and message body) - try (final ByteBufferSharing _unused = ioFilter.getUnwrappedBuffer()) { + synchronized (ioFilter.getSynchObject()) { Header header = msgReader.readHeader(); if (header.getMessageType() == NORMAL_MSG_TYPE) { @@ -2686,7 +2681,7 @@ public class Connection implements Runnable { releaseMsgDestreamer(header.getMessageId(), destreamer); len = destreamer.size(); } - } + } // sync // I'd really just like to call dispatchMessage here. However, // that call goes through a bunch of checks that knock about // 10% of the performance. Since this direct-ack stuff is all @@ -2753,9 +2748,8 @@ public class Connection implements Runnable { private void processInputBuffer() throws ConnectionException, IOException { inputBuffer.flip(); - try (final ByteBufferSharing sharedBuffer = ioFilter.unwrap(inputBuffer)) { - final ByteBuffer peerDataBuffer = sharedBuffer.getBuffer(); - + synchronized (ioFilter.getSynchObject()) { + ByteBuffer peerDataBuffer = ioFilter.unwrap(inputBuffer); peerDataBuffer.flip(); boolean done = false; diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java index 42ecf04..48eb984 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java @@ -26,7 +26,6 @@ import org.apache.geode.distributed.internal.ReplyProcessor21; import org.apache.geode.internal.Assert; import org.apache.geode.internal.InternalDataSerializer; import org.apache.geode.internal.net.BufferPool; -import org.apache.geode.internal.net.ByteBufferSharing; import org.apache.geode.internal.net.NioFilter; import org.apache.geode.internal.serialization.KnownVersion; import org.apache.geode.logging.internal.log4j.api.LogService; @@ -55,8 +54,8 @@ public class MsgReader { } Header readHeader() throws IOException { - try (final ByteBufferSharing sharedBuffer = readAtLeast(Connection.MSG_HEADER_BYTES)) { - ByteBuffer unwrappedBuffer = sharedBuffer.getBuffer(); + synchronized (ioFilter.getSynchObject()) { + ByteBuffer unwrappedBuffer = readAtLeast(Connection.MSG_HEADER_BYTES); Assert.assertTrue(unwrappedBuffer.remaining() >= Connection.MSG_HEADER_BYTES); @@ -90,8 +89,8 @@ public class MsgReader { */ DistributionMessage readMessage(Header header) throws IOException, ClassNotFoundException { - try (final ByteBufferSharing sharedBuffer = readAtLeast(header.messageLength)) { - ByteBuffer nioInputBuffer = sharedBuffer.getBuffer(); + synchronized (ioFilter.getSynchObject()) { + ByteBuffer nioInputBuffer = readAtLeast(header.messageLength); Assert.assertTrue(nioInputBuffer.remaining() >= header.messageLength); this.getStats().incMessagesBeingReceived(true, header.messageLength); long startSer = this.getStats().startMsgDeserialization(); @@ -113,8 +112,8 @@ public class MsgReader { void readChunk(Header header, MsgDestreamer md) throws IOException { - try (final ByteBufferSharing sharedBuffer = readAtLeast(header.messageLength)) { - ByteBuffer unwrappedBuffer = sharedBuffer.getBuffer(); + synchronized (ioFilter.getSynchObject()) { + ByteBuffer unwrappedBuffer = readAtLeast(header.messageLength); this.getStats().incMessagesBeingReceived(md.size() == 0, header.messageLength); md.addChunk(unwrappedBuffer, header.messageLength); // show that the bytes have been consumed by adjusting the buffer's position @@ -124,7 +123,7 @@ public class MsgReader { - private ByteBufferSharing readAtLeast(int bytes) throws IOException { + private ByteBuffer readAtLeast(int bytes) throws IOException { peerNetData = ioFilter.ensureWrappedCapacity(bytes, peerNetData, BufferPool.BufferType.TRACKED_RECEIVER); return ioFilter.readAtLeast(conn.getSocket().getChannel(), bytes, peerNetData); diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferSharingImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferSharingImplTest.java deleted file mode 100644 index bb5a75f..0000000 --- a/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferSharingImplTest.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.geode.internal.net; - -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.assertj.core.api.Assertions.fail; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.nio.ByteBuffer; -import java.util.concurrent.CountDownLatch; - -import org.junit.Before; -import org.junit.Test; - -public class ByteBufferSharingImplTest { - - private ByteBufferSharingImpl sharing; - private BufferPool poolMock; - private CountDownLatch clientHasOpenedResource; - private CountDownLatch clientMayComplete; - - @Before - public void before() { - poolMock = mock(BufferPool.class); - sharing = - new ByteBufferSharingImpl(mock(ByteBuffer.class), BufferPool.BufferType.TRACKED_SENDER, - poolMock); - clientHasOpenedResource = new CountDownLatch(1); - clientMayComplete = new CountDownLatch(1); - } - - @Test - public void balancedCloseOwnerIsLastReferenceHolder() throws InterruptedException { - resourceOwnerIsLastReferenceHolder("client with balanced close calls", () -> { - try (final ByteBufferSharing _unused = sharing.open()) { - } - }); - } - - @Test - public void extraCloseOwnerIsLastReferenceHolder() throws InterruptedException { - resourceOwnerIsLastReferenceHolder("client with extra close calls", () -> { - final ByteBufferSharing sharing2 = sharing.open(); - sharing2.close(); - verify(poolMock, times(0)).releaseBuffer(any(), any()); - assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class); - verify(poolMock, times(0)).releaseBuffer(any(), any()); - }); - } - - @Test - public void balancedCloseClientIsLastReferenceHolder() throws InterruptedException { - clientIsLastReferenceHolder("client with balanced close calls", () -> { - try (final ByteBufferSharing _unused = sharing.open()) { - clientHasOpenedResource.countDown(); - blockClient(); - } - }); - } - - @Test - public void extraCloseClientIsLastReferenceHolder() throws InterruptedException { - clientIsLastReferenceHolder("client with extra close calls", () -> { - final ByteBufferSharing sharing2 = sharing.open(); - clientHasOpenedResource.countDown(); - blockClient(); - sharing2.close(); - verify(poolMock, times(1)).releaseBuffer(any(), any()); - assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class); - System.out.println("here"); - }); - } - - @Test - public void extraCloseDoesNotPrematurelyReturnBufferToPool() { - final ByteBufferSharing sharing2 = sharing.open(); - sharing2.close(); - assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class); - verify(poolMock, times(0)).releaseBuffer(any(), any()); - sharing.destruct(); - verify(poolMock, times(1)).releaseBuffer(any(), any()); - } - - @Test - public void extraCloseDoesNotDecrementRefCount() { - final ByteBufferSharing sharing2 = sharing.open(); - sharing2.close(); - assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class); - final ByteBufferSharing sharing3 = this.sharing.open(); - sharing.destruct(); - verify(poolMock, times(0)).releaseBuffer(any(), any()); - } - - private void resourceOwnerIsLastReferenceHolder(final String name, final Runnable client) - throws InterruptedException { - /* - * Thread.currentThread() is thread is playing the role of the (ByteBuffer) resource owner - */ - - /* - * clientThread thread is playing the role of the client (of the resource owner) - */ - final Thread clientThread = new Thread(client, name); - clientThread.start(); - clientThread.join(); - - verify(poolMock, times(0)).releaseBuffer(any(), any()); - - sharing.destruct(); - - verify(poolMock, times(1)).releaseBuffer(any(), any()); - } - - private void clientIsLastReferenceHolder(final String name, final Runnable client) - throws InterruptedException { - /* - * Thread.currentThread() is thread is playing the role of the (ByteBuffer) resource owner - */ - - /* - * clientThread thread is playing the role of the client (of the resource owner) - */ - final Thread clientThread = new Thread(client, name); - clientThread.start(); - - clientHasOpenedResource.await(); - - sharing.destruct(); - - verify(poolMock, times(0)).releaseBuffer(any(), any()); - - clientMayComplete.countDown(); // let client finish - - clientThread.join(); - - verify(poolMock, times(1)).releaseBuffer(any(), any()); - } - - private void blockClient() { - try { - clientMayComplete.await(); - } catch (InterruptedException e) { - fail("test client thread interrupted: " + e); - } - } - -} diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java index 7ab838c..3d394fb 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java @@ -50,8 +50,7 @@ public class NioPlainEngineTest { public void unwrap() { ByteBuffer buffer = ByteBuffer.allocate(100); buffer.position(0).limit(buffer.capacity()); - try (final ByteBufferSharing unused = nioEngine.unwrap(buffer)) { - } + nioEngine.unwrap(buffer); assertThat(buffer.position()).isEqualTo(buffer.limit()); } @@ -117,29 +116,23 @@ public class NioPlainEngineTest { nioEngine.lastReadPosition = 10; - try (final ByteBufferSharing sharedBuffer = - nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) { - ByteBuffer data = sharedBuffer.getBuffer(); - verify(mockChannel, times(3)).read(isA(ByteBuffer.class)); - assertThat(data.position()).isEqualTo(0); - assertThat(data.limit()).isEqualTo(amountToRead); - assertThat(nioEngine.lastReadPosition).isEqualTo(individualRead * 3 + preexistingBytes); - assertThat(nioEngine.lastProcessedPosition).isEqualTo(amountToRead); - } - - try (final ByteBufferSharing sharedBuffer = - nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) { - final ByteBuffer data = sharedBuffer.getBuffer(); - verify(mockChannel, times(5)).read(any(ByteBuffer.class)); - // at end of last readAtLeast data - assertThat(data.position()).isEqualTo(amountToRead); - // we read amountToRead bytes - assertThat(data.limit()).isEqualTo(amountToRead * 2); - // we did 2 more reads from the network - assertThat(nioEngine.lastReadPosition).isEqualTo(individualRead * 5 + preexistingBytes); - // the next read will start at the end of consumed data - assertThat(nioEngine.lastProcessedPosition).isEqualTo(amountToRead * 2); - } + ByteBuffer data = nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer); + verify(mockChannel, times(3)).read(isA(ByteBuffer.class)); + assertThat(data.position()).isEqualTo(0); + assertThat(data.limit()).isEqualTo(amountToRead); + assertThat(nioEngine.lastReadPosition).isEqualTo(individualRead * 3 + preexistingBytes); + assertThat(nioEngine.lastProcessedPosition).isEqualTo(amountToRead); + + data = nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer); + verify(mockChannel, times(5)).read(any(ByteBuffer.class)); + // at end of last readAtLeast data + assertThat(data.position()).isEqualTo(amountToRead); + // we read amountToRead bytes + assertThat(data.limit()).isEqualTo(amountToRead * 2); + // we did 2 more reads from the network + assertThat(nioEngine.lastReadPosition).isEqualTo(individualRead * 5 + preexistingBytes); + // the next read will start at the end of consumed data + assertThat(nioEngine.lastProcessedPosition).isEqualTo(amountToRead * 2); } @@ -154,9 +147,7 @@ public class NioPlainEngineTest { nioEngine.lastReadPosition = 10; - try (final ByteBufferSharing unused = - nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) { - } + nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer); } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java index 2f9a88a..88e4f31 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java @@ -69,7 +69,6 @@ public class NioSslEngineTest { private DMStats mockStats; private NioSslEngine nioSslEngine; private NioSslEngine spyNioSslEngine; - private BufferPool spyBufferPool; @Before public void setUp() throws Exception { @@ -82,17 +81,13 @@ public class NioSslEngineTest { mockStats = mock(DMStats.class); - final BufferPool bufferPool = new BufferPool(mockStats); - spyBufferPool = spy(bufferPool); - nioSslEngine = new NioSslEngine(mockEngine, spyBufferPool); + nioSslEngine = new NioSslEngine(mockEngine, new BufferPool(mockStats)); spyNioSslEngine = spy(nioSslEngine); } @Test - public void engineUsesDirectBuffers() throws IOException { - try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) { - assertThat(outputSharing.getBuffer().isDirect()).isTrue(); - } + public void engineUsesDirectBuffers() { + assertThat(nioSslEngine.myNetData.isDirect()).isTrue(); } @Test @@ -124,7 +119,7 @@ public class NioSslEngineTest { verify(mockEngine, atLeast(2)).getHandshakeStatus(); verify(mockEngine, times(3)).wrap(any(ByteBuffer.class), any(ByteBuffer.class)); verify(mockEngine, times(3)).unwrap(any(ByteBuffer.class), any(ByteBuffer.class)); - verify(spyBufferPool, times(2)).expandWriteBufferIfNeeded(any(BufferPool.BufferType.class), + verify(spyNioSslEngine, times(2)).expandWriteBuffer(any(BufferPool.BufferType.class), any(ByteBuffer.class), any(Integer.class)); verify(spyNioSslEngine, times(1)).handleBlockingTasks(); verify(mockChannel, times(3)).read(any(ByteBuffer.class)); @@ -188,173 +183,171 @@ public class NioSslEngineTest { .hasMessageContaining("SSL Handshake terminated with status"); } + + @Test + public void checkClosed() throws Exception { + nioSslEngine.checkClosed(); + } + + @Test(expected = IOException.class) + public void checkClosedThrows() throws Exception { + when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn( + new SSLEngineResult(CLOSED, FINISHED, 0, 100)); + nioSslEngine.close(mock(SocketChannel.class)); + nioSslEngine.checkClosed(); + } + + @Test + public void synchObjectIsSelf() { + // for thread-safety the synchronization object given to outside entities + // must be the the engine itself. This allows external manipulation or + // use of the engine's buffers to be protected in the same way as its synchronized + // methods + assertThat(nioSslEngine.getSynchObject()).isSameAs(nioSslEngine); + } + @Test public void wrap() throws Exception { - try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) { - - // make the application data too big to fit into the engine's encryption buffer - ByteBuffer appData = - ByteBuffer.allocate(outputSharing.getBuffer().capacity() + 100); - byte[] appBytes = new byte[appData.capacity()]; - Arrays.fill(appBytes, (byte) 0x1F); - appData.put(appBytes); - appData.flip(); - - // create an engine that will transfer bytes from the application buffer to the encrypted - // buffer - TestSSLEngine testEngine = new TestSSLEngine(); - testEngine.addReturnResult( - new SSLEngineResult(OK, NEED_TASK, appData.remaining(), appData.remaining())); - spyNioSslEngine.engine = testEngine; - - try (final ByteBufferSharing outputSharing2 = spyNioSslEngine.wrap(appData)) { - ByteBuffer wrappedBuffer = outputSharing2.getBuffer(); - - verify(spyBufferPool, times(1)).expandWriteBufferIfNeeded(any(BufferPool.BufferType.class), - any(ByteBuffer.class), any(Integer.class)); - appData.flip(); - assertThat(wrappedBuffer).isEqualTo(appData); - } - verify(spyNioSslEngine, times(1)).handleBlockingTasks(); - } + // make the application data too big to fit into the engine's encryption buffer + ByteBuffer appData = ByteBuffer.allocate(nioSslEngine.myNetData.capacity() + 100); + byte[] appBytes = new byte[appData.capacity()]; + Arrays.fill(appBytes, (byte) 0x1F); + appData.put(appBytes); + appData.flip(); + + // create an engine that will transfer bytes from the application buffer to the encrypted buffer + TestSSLEngine testEngine = new TestSSLEngine(); + testEngine.addReturnResult( + new SSLEngineResult(OK, NEED_TASK, appData.remaining(), appData.remaining())); + spyNioSslEngine.engine = testEngine; + + ByteBuffer wrappedBuffer = spyNioSslEngine.wrap(appData); + + verify(spyNioSslEngine, times(1)).expandWriteBuffer(any(BufferPool.BufferType.class), + any(ByteBuffer.class), any(Integer.class)); + appData.flip(); + assertThat(wrappedBuffer).isEqualTo(appData); + verify(spyNioSslEngine, times(1)).handleBlockingTasks(); } @Test - public void wrapFails() throws IOException { - try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) { - // make the application data too big to fit into the engine's encryption buffer - ByteBuffer appData = - ByteBuffer.allocate(outputSharing.getBuffer().capacity() + 100); - byte[] appBytes = new byte[appData.capacity()]; - Arrays.fill(appBytes, (byte) 0x1F); - appData.put(appBytes); - appData.flip(); - - // create an engine that will transfer bytes from the application buffer to the encrypted - // buffer - TestSSLEngine testEngine = new TestSSLEngine(); - testEngine.addReturnResult( - new SSLEngineResult(CLOSED, NEED_TASK, appData.remaining(), appData.remaining())); - spyNioSslEngine.engine = testEngine; - - assertThatThrownBy(() -> spyNioSslEngine.wrap(appData)).isInstanceOf(SSLException.class) - .hasMessageContaining("Error encrypting data"); - } + public void wrapFails() { + // make the application data too big to fit into the engine's encryption buffer + ByteBuffer appData = ByteBuffer.allocate(nioSslEngine.myNetData.capacity() + 100); + byte[] appBytes = new byte[appData.capacity()]; + Arrays.fill(appBytes, (byte) 0x1F); + appData.put(appBytes); + appData.flip(); + + // create an engine that will transfer bytes from the application buffer to the encrypted buffer + TestSSLEngine testEngine = new TestSSLEngine(); + testEngine.addReturnResult( + new SSLEngineResult(CLOSED, NEED_TASK, appData.remaining(), appData.remaining())); + spyNioSslEngine.engine = testEngine; + + assertThatThrownBy(() -> spyNioSslEngine.wrap(appData)).isInstanceOf(SSLException.class) + .hasMessageContaining("Error encrypting data"); } @Test public void unwrapWithBufferOverflow() throws Exception { - try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) { - // make the application data too big to fit into the engine's encryption buffer - final ByteBuffer peerAppData = inputSharing.getBuffer(); - - int originalPeerAppDataCapacity = peerAppData.capacity(); - int originalPeerAppDataPosition = originalPeerAppDataCapacity / 2; - peerAppData.position(originalPeerAppDataPosition); - ByteBuffer wrappedData = ByteBuffer.allocate(originalPeerAppDataCapacity + 100); - byte[] netBytes = new byte[wrappedData.capacity()]; - Arrays.fill(netBytes, (byte) 0x1F); - wrappedData.put(netBytes); - wrappedData.flip(); - - // create an engine that will transfer bytes from the application buffer to the encrypted - // buffer - TestSSLEngine testEngine = new TestSSLEngine(); - spyNioSslEngine.engine = testEngine; - - testEngine.addReturnResult( - new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // results in 30,000 byte buffer - new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // 50,000 bytes - new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // 90,000 bytes - new SSLEngineResult(OK, FINISHED, netBytes.length, netBytes.length)); - - int expectedCapacity = 2 * originalPeerAppDataCapacity - originalPeerAppDataPosition; - expectedCapacity = - 2 * (expectedCapacity - originalPeerAppDataPosition) + originalPeerAppDataPosition; - expectedCapacity = - 2 * (expectedCapacity - originalPeerAppDataPosition) + originalPeerAppDataPosition; - try (final ByteBufferSharing sharedBuffer = spyNioSslEngine.unwrap(wrappedData)) { - ByteBuffer unwrappedBuffer = sharedBuffer.getBuffer(); - unwrappedBuffer.flip(); - assertThat(unwrappedBuffer.capacity()).isEqualTo(expectedCapacity); - } - } + // make the application data too big to fit into the engine's encryption buffer + int originalPeerAppDataCapacity = nioSslEngine.peerAppData.capacity(); + int originalPeerAppDataPosition = originalPeerAppDataCapacity / 2; + nioSslEngine.peerAppData.position(originalPeerAppDataPosition); + ByteBuffer wrappedData = ByteBuffer.allocate(originalPeerAppDataCapacity + 100); + byte[] netBytes = new byte[wrappedData.capacity()]; + Arrays.fill(netBytes, (byte) 0x1F); + wrappedData.put(netBytes); + wrappedData.flip(); + + // create an engine that will transfer bytes from the application buffer to the encrypted buffer + TestSSLEngine testEngine = new TestSSLEngine(); + spyNioSslEngine.engine = testEngine; + + testEngine.addReturnResult( + new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // results in 30,000 byte buffer + new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // 50,000 bytes + new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // 90,000 bytes + new SSLEngineResult(OK, FINISHED, netBytes.length, netBytes.length)); + + int expectedCapacity = 2 * originalPeerAppDataCapacity - originalPeerAppDataPosition; + expectedCapacity = + 2 * (expectedCapacity - originalPeerAppDataPosition) + originalPeerAppDataPosition; + expectedCapacity = + 2 * (expectedCapacity - originalPeerAppDataPosition) + originalPeerAppDataPosition; + ByteBuffer unwrappedBuffer = spyNioSslEngine.unwrap(wrappedData); + unwrappedBuffer.flip(); + assertThat(unwrappedBuffer.capacity()).isEqualTo(expectedCapacity); } @Test public void unwrapWithBufferUnderflow() throws Exception { - try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) { - ByteBuffer wrappedData = - ByteBuffer.allocate(inputSharing.getBuffer().capacity()); - byte[] netBytes = new byte[wrappedData.capacity() / 2]; - Arrays.fill(netBytes, (byte) 0x1F); - wrappedData.put(netBytes); - wrappedData.flip(); - - // create an engine that will transfer bytes from the application buffer to the encrypted - // buffer - TestSSLEngine testEngine = new TestSSLEngine(); - testEngine.addReturnResult(new SSLEngineResult(BUFFER_UNDERFLOW, NEED_TASK, 0, 0)); - spyNioSslEngine.engine = testEngine; - - try (final ByteBufferSharing sharedBuffer = spyNioSslEngine.unwrap(wrappedData)) { - ByteBuffer unwrappedBuffer = sharedBuffer.getBuffer(); - unwrappedBuffer.flip(); - assertThat(unwrappedBuffer.remaining()).isEqualTo(0); - } - assertThat(wrappedData.position()).isEqualTo(netBytes.length); - } + ByteBuffer wrappedData = ByteBuffer.allocate(nioSslEngine.peerAppData.capacity()); + byte[] netBytes = new byte[wrappedData.capacity() / 2]; + Arrays.fill(netBytes, (byte) 0x1F); + wrappedData.put(netBytes); + wrappedData.flip(); + + // create an engine that will transfer bytes from the application buffer to the encrypted buffer + TestSSLEngine testEngine = new TestSSLEngine(); + testEngine.addReturnResult(new SSLEngineResult(BUFFER_UNDERFLOW, NEED_TASK, 0, 0)); + spyNioSslEngine.engine = testEngine; + + ByteBuffer unwrappedBuffer = spyNioSslEngine.unwrap(wrappedData); + unwrappedBuffer.flip(); + assertThat(unwrappedBuffer.remaining()).isEqualTo(0); + assertThat(wrappedData.position()).isEqualTo(netBytes.length); } @Test - public void unwrapWithDecryptionError() throws IOException { - try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) { - // make the application data too big to fit into the engine's encryption buffer - ByteBuffer wrappedData = - ByteBuffer.allocate(inputSharing.getBuffer().capacity()); - byte[] netBytes = new byte[wrappedData.capacity() / 2]; - Arrays.fill(netBytes, (byte) 0x1F); - wrappedData.put(netBytes); - wrappedData.flip(); - - // create an engine that will transfer bytes from the application buffer to the encrypted - // buffer - TestSSLEngine testEngine = new TestSSLEngine(); - testEngine.addReturnResult(new SSLEngineResult(CLOSED, FINISHED, 0, 0)); - spyNioSslEngine.engine = testEngine; - - assertThatThrownBy(() -> { - try (final ByteBufferSharing unused = spyNioSslEngine.unwrap(wrappedData)) { - } - }).isInstanceOf(SSLException.class) - .hasMessageContaining("Error decrypting data"); - } + public void unwrapWithDecryptionError() { + // make the application data too big to fit into the engine's encryption buffer + ByteBuffer wrappedData = ByteBuffer.allocate(nioSslEngine.peerAppData.capacity()); + byte[] netBytes = new byte[wrappedData.capacity() / 2]; + Arrays.fill(netBytes, (byte) 0x1F); + wrappedData.put(netBytes); + wrappedData.flip(); + + // create an engine that will transfer bytes from the application buffer to the encrypted buffer + TestSSLEngine testEngine = new TestSSLEngine(); + testEngine.addReturnResult(new SSLEngineResult(CLOSED, FINISHED, 0, 0)); + spyNioSslEngine.engine = testEngine; + + assertThatThrownBy(() -> spyNioSslEngine.unwrap(wrappedData)).isInstanceOf(SSLException.class) + .hasMessageContaining("Error decrypting data"); + } + + @Test + public void ensureUnwrappedCapacity() { + ByteBuffer wrappedBuffer = ByteBuffer.allocate(netBufferSize); + int requestedCapacity = nioSslEngine.getUnwrappedBuffer(wrappedBuffer).capacity() * 2; + ByteBuffer unwrappedBuffer = nioSslEngine.ensureUnwrappedCapacity(requestedCapacity); + assertThat(unwrappedBuffer.capacity()).isGreaterThanOrEqualTo(requestedCapacity); } @Test public void unwrapWithClosedEngineButDataInDecryptedBuffer() throws IOException { - try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) { - // make the application data too big to fit into the engine's encryption buffer - ByteBuffer wrappedData = - ByteBuffer.allocate(inputSharing.getBuffer().capacity()); - byte[] netBytes = new byte[wrappedData.capacity() / 2]; - Arrays.fill(netBytes, (byte) 0x1F); - wrappedData.put(netBytes); - wrappedData.flip(); - final int arbitraryAmountOfRealData = 31; // bytes - inputSharing.getBuffer().position(arbitraryAmountOfRealData); - - // create an engine that will transfer bytes from the application buffer to the encrypted - // buffer - TestSSLEngine testEngine = new TestSSLEngine(); - testEngine.addReturnResult(new SSLEngineResult(CLOSED, FINISHED, 0, 0)); - spyNioSslEngine.engine = testEngine; - - try (final ByteBufferSharing unused = spyNioSslEngine.unwrap(wrappedData)) { - assertThat(inputSharing.getBuffer().position()).isEqualTo(arbitraryAmountOfRealData); - } - } + final ByteBuffer unwrappedBuffer = nioSslEngine.getUnwrappedBuffer(null); + // make the application data too big to fit into the engine's encryption buffer + ByteBuffer wrappedData = + ByteBuffer.allocate(unwrappedBuffer.capacity()); + byte[] netBytes = new byte[wrappedData.capacity() / 2]; + Arrays.fill(netBytes, (byte) 0x1F); + wrappedData.put(netBytes); + wrappedData.flip(); + final int arbitraryAmountOfRealData = 31; // bytes + unwrappedBuffer.position(arbitraryAmountOfRealData); + + // create an engine that will transfer bytes from the application buffer to the encrypted + // buffer + TestSSLEngine testEngine = new TestSSLEngine(); + testEngine.addReturnResult(new SSLEngineResult(CLOSED, FINISHED, 0, 0)); + spyNioSslEngine.engine = testEngine; + + final ByteBuffer unwrappedBuffer2 = spyNioSslEngine.unwrap(wrappedData); + assertThat(unwrappedBuffer2.position()).isEqualTo(arbitraryAmountOfRealData); } @Test @@ -368,11 +361,7 @@ public class NioSslEngineTest { when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn( new SSLEngineResult(CLOSED, FINISHED, 0, 0)); nioSslEngine.close(mockChannel); - assertThatThrownBy(() -> nioSslEngine.shareOutputBuffer().getBuffer()) - .isInstanceOf(IOException.class) - .hasMessageContaining("NioSslEngine has been closed"); - assertThatThrownBy(() -> nioSslEngine.shareInputBuffer().getBuffer()) - .isInstanceOf(IOException.class) + assertThatThrownBy(() -> nioSslEngine.checkClosed()).isInstanceOf(IOException.class) .hasMessageContaining("NioSslEngine has been closed"); nioSslEngine.close(mockChannel); } @@ -401,12 +390,10 @@ public class NioSslEngineTest { when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE); when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenAnswer((x) -> { - try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) { - // give the NioSslEngine something to write on its socket channel, simulating a TLS close - // message - outputSharing.getBuffer().put("Goodbye cruel world".getBytes()); - return new SSLEngineResult(CLOSED, FINISHED, 0, 0); - } + // give the NioSslEngine something to write on its socket channel, simulating a TLS close + // message + nioSslEngine.myNetData.put("Goodbye cruel world".getBytes()); + return new SSLEngineResult(CLOSED, FINISHED, 0, 0); }); when(mockChannel.write(any(ByteBuffer.class))).thenThrow(new ClosedChannelException()); nioSslEngine.close(mockChannel); @@ -437,42 +424,37 @@ public class NioSslEngineTest { ByteBuffer wrappedBuffer = ByteBuffer.allocate(1000); SocketChannel mockChannel = mock(SocketChannel.class); - try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) { - // force a compaction by making the decoded buffer appear near to being full - ByteBuffer unwrappedBuffer = inputSharing.getBuffer(); - unwrappedBuffer.position(unwrappedBuffer.capacity() - individualRead); - unwrappedBuffer.limit(unwrappedBuffer.position() + preexistingBytes); - - // simulate some socket reads - when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() { - @Override - public Integer answer(InvocationOnMock invocation) throws Throwable { - ByteBuffer buffer = invocation.getArgument(0); - buffer.position(buffer.position() + individualRead); - return individualRead; - } - }); - - TestSSLEngine testSSLEngine = new TestSSLEngine(); - testSSLEngine.addReturnResult(new SSLEngineResult(OK, NEED_UNWRAP, 0, 0)); - nioSslEngine.engine = testSSLEngine; - - try (final ByteBufferSharing sharedBuffer = - nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) { - ByteBuffer data = sharedBuffer.getBuffer(); - verify(mockChannel, times(3)).read(isA(ByteBuffer.class)); - assertThat(data.position()).isEqualTo(0); - assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes); + // force a compaction by making the decoded buffer appear near to being full + ByteBuffer unwrappedBuffer = nioSslEngine.peerAppData; + unwrappedBuffer.position(unwrappedBuffer.capacity() - individualRead); + unwrappedBuffer.limit(unwrappedBuffer.position() + preexistingBytes); + + // simulate some socket reads + when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() { + @Override + public Integer answer(InvocationOnMock invocation) throws Throwable { + ByteBuffer buffer = invocation.getArgument(0); + buffer.position(buffer.position() + individualRead); + return individualRead; } - } + }); + + TestSSLEngine testSSLEngine = new TestSSLEngine(); + testSSLEngine.addReturnResult(new SSLEngineResult(OK, NEED_UNWRAP, 0, 0)); + nioSslEngine.engine = testSSLEngine; + + ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer); + verify(mockChannel, times(3)).read(isA(ByteBuffer.class)); + assertThat(data.position()).isEqualTo(0); + assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes); } /** - * This tests the case where a message header has been read and part of a message has been read, - * but the decoded buffer is too small to hold all of the message. In this case the readAtLeast - * method will have to expand the capacity of the decoded buffer and return the new, expanded, - * buffer as the method result. + * This tests the case where a message header has been read and part of a message has been + * read, but the decoded buffer is too small to hold all of the message. In this case + * the readAtLeast method will have to expand the capacity of the decoded buffer and return + * the new, expanded, buffer as the method result. */ @Test public void readAtLeastUsingSmallAppBuffer() throws Exception { @@ -486,11 +468,7 @@ public class NioSslEngineTest { int initialUnwrappedBufferSize = 100; ByteBuffer unwrappedBuffer = ByteBuffer.allocate(initialUnwrappedBufferSize); unwrappedBuffer.position(7).limit(preexistingBytes + 7); // 7 bytes of message header - ignored - - try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) { - final ByteBufferSharingImpl inputSharingImpl = (ByteBufferSharingImpl) inputSharing; - inputSharingImpl.setBufferForTestingOnly(unwrappedBuffer); - } + nioSslEngine.peerAppData = unwrappedBuffer; // simulate some socket reads when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() { @@ -510,26 +488,22 @@ public class NioSslEngineTest { new SSLEngineResult(OK, NEED_UNWRAP, 0, 0)); // 130 + 60 bytes = 190 nioSslEngine.engine = testSSLEngine; - try (final ByteBufferSharing sharedBuffer = - nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) { - ByteBuffer data = sharedBuffer.getBuffer(); - verify(mockChannel, times(3)).read(isA(ByteBuffer.class)); - assertThat(data.position()).isEqualTo(0); - assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes); - // The initial available space in the unwrapped buffer should have doubled - int initialFreeSpace = initialUnwrappedBufferSize - preexistingBytes; - try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) { - assertThat(inputSharing.getBuffer().capacity()) - .isEqualTo(2 * initialFreeSpace + preexistingBytes); - } - } + ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer); + verify(mockChannel, times(3)).read(isA(ByteBuffer.class)); + assertThat(data.position()).isEqualTo(0); + assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes); + // The initial available space in the unwrapped buffer should have doubled + int initialFreeSpace = initialUnwrappedBufferSize - preexistingBytes; + assertThat(nioSslEngine.peerAppData.capacity()) + .isEqualTo(2 * initialFreeSpace + preexistingBytes); } /** - * This tests the case where a message header has been read and part of a message has been read, - * but the decoded buffer is too small to hold all of the message. In this case the buffer is - * completely full and should only take one overflow response to resolve the problem. + * This tests the case where a message header has been read and part of a message has been + * read, but the decoded buffer is too small to hold all of the message. In this case + * the buffer is completely full and should only take one overflow response to resolve + * the problem. */ @Test public void readAtLeastUsingSmallAppBufferAtWriteLimit() throws Exception { @@ -544,10 +518,7 @@ public class NioSslEngineTest { // force buffer expansion by making a small decoded buffer appear near to being full ByteBuffer unwrappedBuffer = ByteBuffer.allocate(initialUnwrappedBufferSize); unwrappedBuffer.position(7).limit(preexistingBytes + 7); // 7 bytes of message header - ignored - try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) { - final ByteBufferSharingImpl inputSharingImpl = (ByteBufferSharingImpl) inputSharing; - inputSharingImpl.setBufferForTestingOnly(unwrappedBuffer); - } + nioSslEngine.peerAppData = unwrappedBuffer; // simulate some socket reads when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() { @@ -567,14 +538,11 @@ public class NioSslEngineTest { new SSLEngineResult(OK, NEED_UNWRAP, 0, 0)); nioSslEngine.engine = testSSLEngine; - try (final ByteBufferSharing sharedBuffer = - nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) { - ByteBuffer data = sharedBuffer.getBuffer(); - verify(mockChannel, times(1)).read(isA(ByteBuffer.class)); - assertThat(data.position()).isEqualTo(0); - assertThat(data.limit()) - .isEqualTo(individualRead * testSSLEngine.getNumberOfUnwraps() + preexistingBytes); - } + ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer); + verify(mockChannel, times(1)).read(isA(ByteBuffer.class)); + assertThat(data.position()).isEqualTo(0); + assertThat(data.limit()) + .isEqualTo(individualRead * testSSLEngine.getNumberOfUnwraps() + preexistingBytes); } @@ -713,8 +681,8 @@ public class NioSslEngineTest { } /** - * add an engine operation result to be returned by wrap or unwrap. Like Mockito's thenReturn(), - * the last return result will repeat forever + * add an engine operation result to be returned by wrap or unwrap. + * Like Mockito's thenReturn(), the last return result will repeat forever */ void addReturnResult(SSLEngineResult... sslEngineResult) { for (SSLEngineResult result : sslEngineResult) {