[
https://issues.apache.org/jira/browse/KAFKA-7454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16632826#comment-16632826
]
ASF GitHub Bot commented on KAFKA-7454:
---------------------------------------
ijuma closed pull request #5713: KAFKA-7454: Use lazy allocation for
SslTransportLayer buffers
URL: https://github.com/apache/kafka/pull/5713
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index 917e5a211f7..a5ff06d9ada 100644
---
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -47,6 +47,7 @@
*/
public class SslTransportLayer implements TransportLayer {
private enum State {
+ NOT_INITALIZED,
HANDSHAKE,
HANDSHAKE_FAILED,
READY,
@@ -70,9 +71,7 @@
private ByteBuffer emptyBuf = ByteBuffer.allocate(0);
public static SslTransportLayer create(String channelId, SelectionKey key,
SSLEngine sslEngine) throws IOException {
- SslTransportLayer transportLayer = new SslTransportLayer(channelId,
key, sslEngine);
- transportLayer.startHandshake();
- return transportLayer;
+ return new SslTransportLayer(channelId, key, sslEngine);
}
// Prefer `create`, only use this in tests
@@ -81,6 +80,7 @@ public static SslTransportLayer create(String channelId,
SelectionKey key, SSLEn
this.key = key;
this.socketChannel = (SocketChannel) key.channel();
this.sslEngine = sslEngine;
+ this.state = State.NOT_INITALIZED;
final LogContext logContext = new
LogContext(String.format("[SslTransportLayer channelId=%s key=%s] ", channelId,
key));
this.log = logContext.logger(getClass());
@@ -88,7 +88,7 @@ public static SslTransportLayer create(String channelId,
SelectionKey key, SSLEn
// Visible for testing
protected void startHandshake() throws IOException {
- if (state != null)
+ if (state != State.NOT_INITALIZED)
throw new IllegalStateException("startHandshake() can only be
called once, state " + state);
this.netReadBuffer = ByteBuffer.allocate(netReadBufferSize());
@@ -156,11 +156,12 @@ public boolean isConnected() {
*/
@Override
public void close() throws IOException {
+ State prevState = state;
if (state == State.CLOSING) return;
state = State.CLOSING;
sslEngine.closeOutbound();
try {
- if (isConnected()) {
+ if (prevState != State.NOT_INITALIZED && isConnected()) {
if (!flush(netWriteBuffer)) {
throw new IOException("Remaining data in the network
buffer, can't send SSL close message.");
}
@@ -181,6 +182,9 @@ public void close() throws IOException {
} finally {
socketChannel.socket().close();
socketChannel.close();
+ netReadBuffer = null;
+ netWriteBuffer = null;
+ appReadBuffer = null;
}
}
@@ -242,6 +246,8 @@ protected boolean flush(ByteBuffer buf) throws IOException {
*/
@Override
public void handshake() throws IOException {
+ if (state == State.NOT_INITALIZED)
+ startHandshake();
if (state == State.READY)
throw renegotiationException();
if (state == State.CLOSING)
diff --git
a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index 3bdb07a87c3..1f9739bf762 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -318,7 +318,6 @@ protected SslTransportLayer buildTransportLayer(SslFactory
sslFactory, String id
SocketChannel socketChannel = (SocketChannel) key.channel();
SSLEngine sslEngine = sslFactory.createSslEngine(host,
socketChannel.socket().getPort());
TestSslTransportLayer transportLayer = new
TestSslTransportLayer(id, key, sslEngine);
- transportLayer.startHandshake();
return transportLayer;
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 67834381764..1b8a5fd7245 100644
---
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -1073,7 +1073,6 @@ protected SslTransportLayer
buildTransportLayer(SslFactory sslFactory, String id
SocketChannel socketChannel = (SocketChannel) key.channel();
SSLEngine sslEngine = sslFactory.createSslEngine(host,
socketChannel.socket().getPort());
TestSslTransportLayer transportLayer = newTransportLayer(id, key,
sslEngine);
- transportLayer.startHandshake();
return transportLayer;
}
@@ -1148,6 +1147,12 @@ else if (numDelayedFlushesRemaining.getAndDecrement() !=
0)
return super.flush(buf);
}
+ @Override
+ protected void startHandshake() throws IOException {
+ assertTrue("SSL handshake initialized too early",
socketChannel().isConnected());
+ super.startHandshake();
+ }
+
private void resetDelayedFlush() {
numDelayedFlushesRemaining.set(flushDelayCount);
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Use lazy allocation for SslTransportLayer buffers
> -------------------------------------------------
>
> Key: KAFKA-7454
> URL: https://issues.apache.org/jira/browse/KAFKA-7454
> Project: Kafka
> Issue Type: Improvement
> Components: security
> Affects Versions: 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
> Reporter: Rajini Sivaram
> Assignee: Rajini Sivaram
> Priority: Major
> Fix For: 2.1.0
>
>
> At the moment, three heap buffers are allocated for SslTransportLayers at the
> time when the instance is created (before establishing the connection on the
> client-side and when accepting the connection on the broker-side). When there
> are a large number of connections and the broker is overloaded, this can
> result in unnecessary memory pressure on the broker due to client
> reconnections since buffers may be allocated unnecessarily for client
> connections whose handshake is never processed. It will be better to lazily
> allocate buffers to reduce memory pressure. On the broker-side, buffers will
> be allocated when the first packet is received from the client, starting
> handshake. On the client-side, buffers will be allocated once the connection
> is established when the client initiates handshake.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)