dschneider-pivotal commented on a change in pull request #6156:
URL: https://github.com/apache/geode/pull/6156#discussion_r650281404
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
##########
@@ -882,6 +1000,12 @@ void readPayloadFields(final int numParts, final int len)
throws IOException {
int readSecurePart = checkAndSetSecurityPart();
int bytesRemaining = len;
+
+ if (this.ioFilter != null) {
Review comment:
In some cases we have a filter even if not using SSL, NioPlainEngine.
What is the difference between ioFilter being null and it being an instance of
NioPlainEngine? Would it be possible to have it never be null? If so, it would
be nice to get rid of all these null checks
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/HeapDataOutputStream.java
##########
@@ -415,5 +417,29 @@ public void write(ByteBufferInputStream.ByteSource source)
{
source.sendTo(this.buffer);
}
+ private void flushBuffer(SocketChannel sc, ByteBuffer out, NioFilter
ioFilter)
+ throws IOException {
+ if (out.position() == 0)
+ return;
+ out.flip();
+ writeToSC(sc, out, ioFilter);
+ out.clear();
+ }
+
+ private void writeToSC(SocketChannel sc, ByteBuffer out, NioFilter ioFilter)
Review comment:
rename to "writeToSocketChannel"
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
##########
@@ -1455,22 +1478,24 @@ private void handleNewClientConnection(final Socket
socket) throws IOException {
"Rejected connection from {} because current connection count of
{} is greater than or equal to the configured max of {}",
new Object[] {socket.getInetAddress(), curCnt,
maxConnections});
+
try {
refuseHandshake(socket.getOutputStream(),
String.format("exceeded max-connections %s",
maxConnections),
- REPLY_REFUSED);
+ REPLY_REFUSED, ioFilter, socket);
Review comment:
since you are now passing "socket" into refuseHandshake, you no longer
need to also pass in socket.getOutputStream()
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
##########
@@ -967,7 +981,8 @@ private void checkForStuckKeys() {
tmpSel.selectNow(); // clear canceled key
sc.registerWithSelector2(selector);
} else {
- if (tmpsk.isValid() && tmpsk.isReadable()) {
+ if (tmpsk.isValid() && (tmpsk.isReadable() || tmpsk.isWritable()))
{
Review comment:
Why did you add isWritable? I think the old code only used non-blocking
on reads. Are you also now doing non-blocking writes?
Note this common expression (line 984 and 1129) could be extracted into a
method
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/HeapDataOutputStream.java
##########
@@ -415,5 +417,29 @@ public void write(ByteBufferInputStream.ByteSource source)
{
source.sendTo(this.buffer);
}
+ private void flushBuffer(SocketChannel sc, ByteBuffer out, NioFilter
ioFilter)
Review comment:
the old code called flushBuffer on the parent class
BufferDataOutputStream. Would it be possible to add the ioFilter to that
method? It looks to me like all callers are in HeapDataOutputStream so making
it a private method in this class is good and you could just remove the unused
one in BufferDataOutputStream
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationMetadata.java
##########
@@ -47,51 +49,67 @@
private KnownVersion clientVersion;
private DataInputStream dataInputStream;
private DataOutputStream dataOutputStream;
+ private final NioFilter ioFilter;
- ClientRegistrationMetadata(final InternalCache cache, final Socket socket) {
+ ClientRegistrationMetadata(final InternalCache cache, final Socket socket,
+ final NioFilter ioFilter) {
this.cache = cache;
this.socket = socket;
socketMessageWriter = new SocketMessageWriter();
+ this.ioFilter = ioFilter;
}
boolean initialize() throws IOException {
- DataInputStream unversionedDataInputStream = new
DataInputStream(socket.getInputStream());
- DataOutputStream unversionedDataOutputStream = new
DataOutputStream(socket.getOutputStream());
-
- if (getAndValidateClientVersion(socket, unversionedDataInputStream,
- unversionedDataOutputStream)) {
- if (oldClientRequiresVersionedStreams(clientVersion)) {
- dataInputStream =
- new VersionedDataInputStream(unversionedDataInputStream,
clientVersion);
- dataOutputStream =
- new VersionedDataOutputStream(unversionedDataOutputStream,
clientVersion);
+ InputStream inputStream = null;
+ try {
+ if (ioFilter == null) {
+ inputStream = socket.getInputStream();
} else {
- dataInputStream = unversionedDataInputStream;
- dataOutputStream = unversionedDataOutputStream;
+ inputStream = ioFilter.getInputStream(socket);
Review comment:
shouldn't the initialization of inputStream happen right before the try
(instead of after) since all the finally block does is close the inputStream?
Also if you then extracted these lines of code into a method you could then
have: final InputStream inputStream = getInputStream(socket, ioFilter); try ...
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
##########
@@ -1424,10 +1439,18 @@ private void handleNewClientConnection(final Socket
socket) throws IOException {
// for 'server to client' communication, send it to the CacheClientNotifier
// for processing.
final CommunicationMode communicationMode;
+ final NioFilter ioFilter;
try {
if (isSelector()) {
- communicationMode = getCommunicationModeForSelector(socket);
+ if (socketCreator.forCluster().useSSL()) {
+ ioFilter = createNIOSSLEngine(socket.getChannel());
+ communicationMode = getCommunicationModeForSSLSelector(socket,
ioFilter);
+ } else {
+ ioFilter = new NioPlainEngine(bufferPool);
+ communicationMode = getCommunicationModeForSelector(socket);
Review comment:
would calling getCommunicationModeForSSLSelector with our NioPlainEngine
ioFilter work here? If so we could then get rid of the old
getCommunicationModeForSelector and just have the one that takes both a socket
and an ioFilter
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]