mivanac commented on a change in pull request #6156:
URL: https://github.com/apache/geode/pull/6156#discussion_r650406449
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
##########
@@ -967,7 +981,8 @@ private void checkForStuckKeys() {
tmpSel.selectNow(); // clear canceled key
sc.registerWithSelector2(selector);
} else {
- if (tmpsk.isValid() && tmpsk.isReadable()) {
+ if (tmpsk.isValid() && (tmpsk.isReadable() || tmpsk.isWritable()))
{
Review comment:
Without this, test are failing, and no connection can not be established.
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationMetadata.java
##########
@@ -47,51 +49,67 @@
private KnownVersion clientVersion;
private DataInputStream dataInputStream;
private DataOutputStream dataOutputStream;
+ private final NioFilter ioFilter;
- ClientRegistrationMetadata(final InternalCache cache, final Socket socket) {
+ ClientRegistrationMetadata(final InternalCache cache, final Socket socket,
+ final NioFilter ioFilter) {
this.cache = cache;
this.socket = socket;
socketMessageWriter = new SocketMessageWriter();
+ this.ioFilter = ioFilter;
}
boolean initialize() throws IOException {
- DataInputStream unversionedDataInputStream = new
DataInputStream(socket.getInputStream());
- DataOutputStream unversionedDataOutputStream = new
DataOutputStream(socket.getOutputStream());
-
- if (getAndValidateClientVersion(socket, unversionedDataInputStream,
- unversionedDataOutputStream)) {
- if (oldClientRequiresVersionedStreams(clientVersion)) {
- dataInputStream =
- new VersionedDataInputStream(unversionedDataInputStream,
clientVersion);
- dataOutputStream =
- new VersionedDataOutputStream(unversionedDataOutputStream,
clientVersion);
+ InputStream inputStream = null;
+ try {
+ if (ioFilter == null) {
+ inputStream = socket.getInputStream();
} else {
- dataInputStream = unversionedDataInputStream;
- dataOutputStream = unversionedDataOutputStream;
+ inputStream = ioFilter.getInputStream(socket);
Review comment:
Updated
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
##########
@@ -1455,22 +1478,24 @@ private void handleNewClientConnection(final Socket
socket) throws IOException {
"Rejected connection from {} because current connection count of
{} is greater than or equal to the configured max of {}",
new Object[] {socket.getInetAddress(), curCnt,
maxConnections});
+
try {
refuseHandshake(socket.getOutputStream(),
String.format("exceeded max-connections %s",
maxConnections),
- REPLY_REFUSED);
+ REPLY_REFUSED, ioFilter, socket);
Review comment:
updated
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/HeapDataOutputStream.java
##########
@@ -415,5 +417,29 @@ public void write(ByteBufferInputStream.ByteSource source)
{
source.sendTo(this.buffer);
}
+ private void flushBuffer(SocketChannel sc, ByteBuffer out, NioFilter
ioFilter)
Review comment:
deleted old method
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/HeapDataOutputStream.java
##########
@@ -415,5 +417,29 @@ public void write(ByteBufferInputStream.ByteSource source)
{
source.sendTo(this.buffer);
}
+ private void flushBuffer(SocketChannel sc, ByteBuffer out, NioFilter
ioFilter)
+ throws IOException {
+ if (out.position() == 0)
+ return;
+ out.flip();
+ writeToSC(sc, out, ioFilter);
+ out.clear();
+ }
+
+ private void writeToSC(SocketChannel sc, ByteBuffer out, NioFilter ioFilter)
Review comment:
updated
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
##########
@@ -1424,10 +1439,18 @@ private void handleNewClientConnection(final Socket
socket) throws IOException {
// for 'server to client' communication, send it to the CacheClientNotifier
// for processing.
final CommunicationMode communicationMode;
+ final NioFilter ioFilter;
try {
if (isSelector()) {
- communicationMode = getCommunicationModeForSelector(socket);
+ if (socketCreator.forCluster().useSSL()) {
+ ioFilter = createNIOSSLEngine(socket.getChannel());
+ communicationMode = getCommunicationModeForSSLSelector(socket,
ioFilter);
+ } else {
+ ioFilter = new NioPlainEngine(bufferPool);
+ communicationMode = getCommunicationModeForSelector(socket);
Review comment:
working on it
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
##########
@@ -882,6 +1000,12 @@ void readPayloadFields(final int numParts, final int len)
throws IOException {
int readSecurePart = checkAndSetSecurityPart();
int bytesRemaining = len;
+
+ if (this.ioFilter != null) {
Review comment:
I will investigate, but I think there are cases when for socketchannel
there are not defined iofilter.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]