Vladsz83 commented on code in PR #12634:
URL: https://github.com/apache/ignite/pull/12634#discussion_r2731896734


##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java:
##########
@@ -1194,6 +1194,7 @@ private void forceStopRead() throws InterruptedException {
 
                 try {
                     TcpDiscoveryIoSession ses = createSession(sock);
+                    ses.switchToFastReader();

Review Comment:
   My overal idea is to use different or flagged `createSession()` so that it 
would choose which session to create checked or fast. Same in other places of 
`switchToFastReader()`



##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java:
##########
@@ -163,63 +176,123 @@ void writeMessage(TcpDiscoveryAbstractMessage msg) 
throws IgniteCheckedException
      * @throws IgniteCheckedException If deserialization fails.
      */
     <T> T readMessage() throws IgniteCheckedException, IOException {
-        byte serMode = (byte)in.read();
+        return reader.readMessage();
+    }
 
-        if (JAVA_SERIALIZATION == serMode)
-            return U.unmarshal(spi.marshaller(), in, clsLdr);
+    /** Switches reader to fast mode after initial validated read. */
+    public void switchToFastReader() {
+        reader = fastReader;
+    }
 
-        try {
-            if (MESSAGE_SERIALIZATION != serMode) {
-                detectSslAlert(serMode, in);
+    /** Base discovery message reader. */
+    private abstract class DiscoveryMessageReader {

Review Comment:
   Do we need this abstract class? I would use smth. like `DefaultReader` (not 
abstract) and `CheckedReader`.



##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java:
##########
@@ -163,63 +176,123 @@ void writeMessage(TcpDiscoveryAbstractMessage msg) 
throws IgniteCheckedException
      * @throws IgniteCheckedException If deserialization fails.
      */
     <T> T readMessage() throws IgniteCheckedException, IOException {
-        byte serMode = (byte)in.read();
+        return reader.readMessage();
+    }
 
-        if (JAVA_SERIALIZATION == serMode)
-            return U.unmarshal(spi.marshaller(), in, clsLdr);
+    /** Switches reader to fast mode after initial validated read. */
+    public void switchToFastReader() {
+        reader = fastReader;
+    }
 
-        try {
-            if (MESSAGE_SERIALIZATION != serMode) {

Review Comment:
   If we want not to know about the session implementation, we might create 
another implementation of TcpDiscoveryIoSession which can check the protocol. 
Not sure worth it. Or we might bring some flag `checkCompatibility` to the 
session. At the first connection, we use session with 
`checkCompatibility=true`. On next permanent connection, we might recreate 
session with `checkCompatibility=false`. This flag is used in the message 
serialization.



##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java:
##########
@@ -163,63 +176,123 @@ void writeMessage(TcpDiscoveryAbstractMessage msg) 
throws IgniteCheckedException
      * @throws IgniteCheckedException If deserialization fails.
      */
     <T> T readMessage() throws IgniteCheckedException, IOException {
-        byte serMode = (byte)in.read();
+        return reader.readMessage();
+    }
 
-        if (JAVA_SERIALIZATION == serMode)
-            return U.unmarshal(spi.marshaller(), in, clsLdr);
+    /** Switches reader to fast mode after initial validated read. */
+    public void switchToFastReader() {
+        reader = fastReader;
+    }
 
-        try {
-            if (MESSAGE_SERIALIZATION != serMode) {
-                detectSslAlert(serMode, in);
+    /** Base discovery message reader. */
+    private abstract class DiscoveryMessageReader {
+        /**
+         * Reads the next discovery message from the socket input stream.
+         *
+         * @param <T> Type of the expected message.
+         * @return Deserialized message instance.
+         * @throws IgniteCheckedException If deserialization fails.
+         */
+        abstract <T> T readMessage() throws IgniteCheckedException, 
IOException;
 
-                // IOException type is important for ServerImpl. It may search 
the cause (X.hasCause).
-                // The connection error processing behavior depends on it.
-                throw new IOException("Received unexpected byte while reading 
discovery message: " + serMode);
-            }
-
-            Message msg = 
spi.messageFactory().create(makeMessageType((byte)in.read(), (byte)in.read()));
+        /** Reads serialized discovery message body when {@link 
#MESSAGE_SERIALIZATION} is used. */
+        protected <T> T readDiscoveryMessageBody() throws 
IgniteCheckedException {
+            try {
+                Message msg = 
spi.messageFactory().create(makeMessageType((byte)in.read(), (byte)in.read()));
 
-            msgReader.reset();
-            msgReader.setBuffer(msgBuf);
+                msgReader.reset();
+                msgReader.setBuffer(msgBuf);
 
-            MessageSerializer msgSer = 
spi.messageFactory().serializer(msg.directType());
+                MessageSerializer msgSer = 
spi.messageFactory().serializer(msg.directType());
 
-            boolean finished;
+                boolean finished;
 
-            do {
-                msgBuf.clear();
+                do {
+                    msgBuf.clear();
 
-                int read = in.read(msgBuf.array(), msgBuf.position(), 
msgBuf.remaining());
+                    int read = in.read(msgBuf.array(), msgBuf.position(), 
msgBuf.remaining());
 
-                if (read == -1)
-                    throw new EOFException("Connection closed before message 
was fully read.");
+                    if (read == -1)
+                        throw new EOFException("Connection closed before 
message was fully read.");
 
-                msgBuf.limit(read);
+                    msgBuf.limit(read);
 
-                finished = msgSer.readFrom(msg, msgReader);
+                    finished = msgSer.readFrom(msg, msgReader);
 
-                // Server Discovery only sends next message to next Server 
upon receiving a receipt for the previous one.
-                // This behaviour guarantees that we never read a next message 
from the buffer right after the end of
-                // the previous message. But it is not guaranteed with Client 
Discovery where messages aren't acknowledged.
-                // Thus, we have to keep the uprocessed bytes read from the 
socket. It won't return them again.
-                if (msgBuf.hasRemaining()) {
-                    byte[] unprocessedReadTail = new byte[msgBuf.remaining()];
+                    // Server Discovery only sends next message to next Server 
upon receiving a receipt for the previous one.
+                    // This behaviour guarantees that we never read a next 
message from the buffer right after the end of
+                    // the previous message. But it is not guaranteed with 
Client Discovery where messages aren't acknowledged.
+                    // Thus, we have to keep the uprocessed bytes read from 
the socket. It won't return them again.
+                    if (msgBuf.hasRemaining()) {
+                        byte[] unprocessedReadTail = new 
byte[msgBuf.remaining()];
 
-                    msgBuf.get(unprocessedReadTail, 0, msgBuf.remaining());
+                        msgBuf.get(unprocessedReadTail, 0, msgBuf.remaining());
 
-                    in.attachByteArray(unprocessedReadTail);
+                        in.attachByteArray(unprocessedReadTail);
+                    }
                 }
+                while (!finished);
+
+                return (T)msg;
             }
-            while (!finished);
+            catch (Exception e) {
+                // Keep logic similar to `U.marshal(...)`.
+                if (e instanceof IgniteCheckedException)
+                    throw (IgniteCheckedException)e;
 
-            return (T)msg;
+                throw new IgniteCheckedException(e);
+            }
         }
-        catch (Exception e) {
-            // Keep logic similar to `U.marshal(...)`.
-            if (e instanceof IgniteCheckedException)
-                throw (IgniteCheckedException)e;
 
-            throw new IgniteCheckedException(e);
+        /** Reads 4-byte header for diagnostics. */
+        protected byte[] readHeader(byte serMode) throws IOException {

Review Comment:
   Is colled only from a derived class.



##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java:
##########
@@ -163,63 +176,123 @@ void writeMessage(TcpDiscoveryAbstractMessage msg) 
throws IgniteCheckedException
      * @throws IgniteCheckedException If deserialization fails.
      */
     <T> T readMessage() throws IgniteCheckedException, IOException {
-        byte serMode = (byte)in.read();
+        return reader.readMessage();
+    }
 
-        if (JAVA_SERIALIZATION == serMode)
-            return U.unmarshal(spi.marshaller(), in, clsLdr);
+    /** Switches reader to fast mode after initial validated read. */
+    public void switchToFastReader() {
+        reader = fastReader;
+    }
 
-        try {
-            if (MESSAGE_SERIALIZATION != serMode) {
-                detectSslAlert(serMode, in);
+    /** Base discovery message reader. */
+    private abstract class DiscoveryMessageReader {

Review Comment:
   To me, looks as overhead. Too much new code, more complication.  We alread 
have many `if`s, `switch`es in the message parsing. I belive, one more wouldn't 
change the roitine. Instead new subclasses, we might bring to the session some 
flag like `checkProtocol`. And just recreate session with/without it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to