echobravopapa commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r452410523
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -94,11 +96,14 @@
*
* @since GemFire 2.0
*/
-public class Connection implements Runnable {
+public class ClusterConnection implements Runnable {
Review comment:
+1 on the rename
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
##########
@@ -17,24 +17,31 @@
import java.io.EOFException;
Review comment:
should these classes be renamed since we are using OiO now?
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -2909,13 +2950,13 @@ private boolean readMessageHeader(ByteBuffer
peerDataBuffer) throws IOException
readerShuttingDown = true;
requestClose(String.format("Unknown P2P message type: %s",
nioMessageTypeInteger));
- return true;
+ return false;
Review comment:
not obvious why this bool is flipped...
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -2588,23 +2596,44 @@ void writeFully(SocketChannel channel, ByteBuffer
buffer, boolean forceAsync,
}
// fall through
}
- ByteBuffer wrappedBuffer = ioFilter.wrap(buffer);
- while (wrappedBuffer.remaining() > 0) {
+ while (buffer.remaining() > 0) {
int amtWritten = 0;
long start = stats.startSocketWrite(true);
try {
- amtWritten = channel.write(wrappedBuffer);
+ if (socket instanceof SSLSocket) {
+ OutputStream output = socket.getOutputStream();
+ if (buffer.hasArray()) {
+ output.write(buffer.array(), buffer.arrayOffset(),
+ buffer.limit() - buffer.position());
+ buffer.position(buffer.limit());
+ } else {
+ // socket output streams are FileOutputStreams and have a
writeable Channel.
+ // This code merely fetches that channel and writes to it.
+ // Channels.newChannel(output).write(buffer);
Review comment:
dead code...
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -1142,31 +1155,47 @@ private Connection(ConnectionTable t, boolean
preserveOrder, InternalDistributed
InetSocketAddress addr =
new InetSocketAddress(remoteID.getInetAddress(),
remoteID.getDirectChannelPort());
- SocketChannel channel = SocketChannel.open();
- owner.addConnectingSocket(channel.socket(), addr.getAddress());
-
- try {
- channel.socket().setTcpNoDelay(true);
- channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
+ int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
+ boolean useSSL = getConduit().useSSL();
+ if (useSSL) {
+ // int socketBufferSize = -1;
+ int socketBufferSize =
+ sharedResource ? SMALL_BUFFER_SIZE :
this.owner.getConduit().tcpBufferSize;
+ socket = getConduit().getSocketCreator().forAdvancedUse().connect(
+ new HostAndPort(remoteID.getHostName(),
remoteID.getDirectChannelPort()),
+ 0, null, false, socketBufferSize, true);
+ setSocketBufferSize(this.socket, false, socketBufferSize, true);
+ } else {
+ SocketChannel channel = SocketChannel.open();
+ socket = channel.socket();
// If conserve-sockets is false, the socket can be used for receiving
responses, so set the
// receive buffer accordingly.
if (!sharedResource) {
- setReceiveBufferSize(channel.socket(),
owner.getConduit().tcpBufferSize);
+ setReceiveBufferSize(socket, owner.getConduit().tcpBufferSize);
} else {
- setReceiveBufferSize(channel.socket(), SMALL_BUFFER_SIZE); // make
small since only
+ setReceiveBufferSize(socket, SMALL_BUFFER_SIZE); // make small since
only
// receive ack messages
}
- setSendBufferSize(channel.socket());
- channel.configureBlocking(true);
+ }
+ owner.addConnectingSocket(socket, addr.getAddress());
+
+ try {
+ socket.setTcpNoDelay(true);
+ socket.setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
- int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
+ setSendBufferSize(socket);
+ if (!useSSL) {
+ socket.getChannel().configureBlocking(true);
+ }
try {
- channel.socket().connect(addr, connectTime);
-
- createIoFilter(channel, true);
+ if (!useSSL) {
+ // haven't connected yet
+ socket.connect(addr, connectTime);
+ }
+ configureInputStream(socket, true);
} catch (NullPointerException e) {
// jdk 1.7 sometimes throws an NPE here
Review comment:
out of scope, but kinda reads like tech debt...
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
##########
@@ -253,7 +253,7 @@ ByteBuffer expandWriteBufferIfNeeded(BufferType type,
ByteBuffer existing,
return newBuffer;
}
- ByteBuffer acquireDirectBuffer(BufferPool.BufferType type, int capacity) {
+ public ByteBuffer acquireDirectBuffer(BufferPool.BufferType type, int
capacity) {
Review comment:
what required the addition of `public`?
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -2722,9 +2753,10 @@ public void readAck(final DirectReplyProcessor processor)
* deserialized and passed to TCPConduit for further processing
*/
private void processInputBuffer() throws ConnectionException, IOException {
+ // BRUCE: simplify this
Review comment:
do we need a story for this TODO?
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -140,7 +145,7 @@
private final ConnectionTable owner;
private final TCPConduit conduit;
- private NioFilter ioFilter;
+ // private NioFilter ioFilter;
Review comment:
dead code...
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -1142,31 +1155,47 @@ private Connection(ConnectionTable t, boolean
preserveOrder, InternalDistributed
InetSocketAddress addr =
new InetSocketAddress(remoteID.getInetAddress(),
remoteID.getDirectChannelPort());
- SocketChannel channel = SocketChannel.open();
- owner.addConnectingSocket(channel.socket(), addr.getAddress());
-
- try {
- channel.socket().setTcpNoDelay(true);
- channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
+ int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
+ boolean useSSL = getConduit().useSSL();
+ if (useSSL) {
+ // int socketBufferSize = -1;
Review comment:
dead code...
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -2722,9 +2753,10 @@ public void readAck(final DirectReplyProcessor processor)
* deserialized and passed to TCPConduit for further processing
*/
private void processInputBuffer() throws ConnectionException, IOException {
+ // BRUCE: simplify this
Review comment:
or a git-hook ;)
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -3006,7 +3047,7 @@ private void readMessage(ByteBuffer peerDataBuffer) {
} catch (IOException ex) {
// ignored
}
- } else /* (nioMessageType == END_CHUNKED_MSG_TYPE) */ {
+ } else /* (messageType == END_CHUNKED_MSG_TYPE) */ {
Review comment:
looks dead, maybe there was `else if` once upon a time...
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
##########
@@ -202,6 +202,8 @@
private final Stopper stopper = new Stopper();
+ private boolean enableTLSOverNIO = true; //
Boolean.getBoolean("geode.enable-tls-nio");
+
Review comment:
dead code hanging out...
##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
##########
@@ -14,70 +14,66 @@
*/
package org.apache.geode.internal.tcp;
+import java.io.EOFException;
import java.io.IOException;
-import java.nio.BufferUnderflowException;
+import java.io.InputStream;
+import java.net.Socket;
import java.nio.ByteBuffer;
-import org.apache.logging.log4j.Logger;
-
import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.net.BufferPool;
-import org.apache.geode.internal.net.NioFilter;
+import org.apache.geode.internal.net.SocketUtils;
import org.apache.geode.internal.serialization.Version;
-import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* This class is currently used for reading direct ack responses It should
probably be used for all
* of the reading done in Connection.
*
*/
public class MsgReader {
- private static final Logger logger = LogService.getLogger();
-
- protected final Connection conn;
+ protected final ClusterConnection conn;
+ private final BufferPool bufferPool;
protected final Header header = new Header();
- private final NioFilter ioFilter;
private ByteBuffer peerNetData;
+ private final InputStream inputStream;
private final ByteBufferInputStream byteBufferInputStream;
+ private int lastProcessedPosition;
+ private int lastReadPosition;
-
- MsgReader(Connection conn, NioFilter nioFilter, Version version) {
+ MsgReader(ClusterConnection conn, BufferPool bufferPool, InputStream
inputStream,
+ Version version) {
+ this.bufferPool = bufferPool;
this.conn = conn;
- this.ioFilter = nioFilter;
+ this.inputStream = inputStream;
this.byteBufferInputStream =
version == null ? new ByteBufferInputStream() : new
VersionedByteBufferInputStream(version);
}
Header readHeader() throws IOException {
- ByteBuffer unwrappedBuffer = readAtLeast(Connection.MSG_HEADER_BYTES);
+ ByteBuffer buffer = readAtLeast(ClusterConnection.MSG_HEADER_BYTES);
- Assert.assertTrue(unwrappedBuffer.remaining() >=
Connection.MSG_HEADER_BYTES);
-
- try {
- int nioMessageLength = unwrappedBuffer.getInt();
- /* nioMessageVersion = */
- Connection.calcHdrVersion(nioMessageLength);
- nioMessageLength = Connection.calcMsgByteSize(nioMessageLength);
- byte nioMessageType = unwrappedBuffer.get();
- short nioMsgId = unwrappedBuffer.getShort();
-
- boolean directAck = (nioMessageType & Connection.DIRECT_ACK_BIT) != 0;
- if (directAck) {
- nioMessageType &= ~Connection.DIRECT_ACK_BIT; // clear the ack bit
- }
+ Assert.assertTrue(buffer.remaining() >=
ClusterConnection.MSG_HEADER_BYTES);
- header.setFields(nioMessageLength, nioMessageType, nioMsgId);
+ int messageLength = buffer.getInt();
+ /* nioMessageVersion = */
Review comment:
dead?
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
##########
@@ -945,6 +962,10 @@ public boolean useSSL() {
return useSSL;
}
+ public boolean useDirectReceiveBuffers() {
+ return !useSSL();
+ }
+
Review comment:
previous question might be resolved following this accessor... for
ssl/tls we use NIO and !ssl we use the directbuffers, cool and with 6 you get
eggroll
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]