Vladsz83 commented on code in PR #12634:
URL: https://github.com/apache/ignite/pull/12634#discussion_r2731896734
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java:
##########
@@ -1194,6 +1194,7 @@ private void forceStopRead() throws InterruptedException {
try {
TcpDiscoveryIoSession ses = createSession(sock);
+ ses.switchToFastReader();
Review Comment:
My overal idea is to use different or flagged `createSession()` so that it
would choose which session to create checked or fast. Same in other places of
`switchToFastReader()`
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java:
##########
@@ -163,63 +176,123 @@ void writeMessage(TcpDiscoveryAbstractMessage msg)
throws IgniteCheckedException
* @throws IgniteCheckedException If deserialization fails.
*/
<T> T readMessage() throws IgniteCheckedException, IOException {
- byte serMode = (byte)in.read();
+ return reader.readMessage();
+ }
- if (JAVA_SERIALIZATION == serMode)
- return U.unmarshal(spi.marshaller(), in, clsLdr);
+ /** Switches reader to fast mode after initial validated read. */
+ public void switchToFastReader() {
+ reader = fastReader;
+ }
- try {
- if (MESSAGE_SERIALIZATION != serMode) {
- detectSslAlert(serMode, in);
+ /** Base discovery message reader. */
+ private abstract class DiscoveryMessageReader {
Review Comment:
Do we need this abstract class? I would use smth. like `DefaultReader` (not
abstract) and `CheckedReader`.
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java:
##########
@@ -163,63 +176,123 @@ void writeMessage(TcpDiscoveryAbstractMessage msg)
throws IgniteCheckedException
* @throws IgniteCheckedException If deserialization fails.
*/
<T> T readMessage() throws IgniteCheckedException, IOException {
- byte serMode = (byte)in.read();
+ return reader.readMessage();
+ }
- if (JAVA_SERIALIZATION == serMode)
- return U.unmarshal(spi.marshaller(), in, clsLdr);
+ /** Switches reader to fast mode after initial validated read. */
+ public void switchToFastReader() {
+ reader = fastReader;
+ }
- try {
- if (MESSAGE_SERIALIZATION != serMode) {
Review Comment:
If we want not to know about the session implementation, we might create
another implementation of TcpDiscoveryIoSession which can check the protocol.
Not sure worth it. Or we might bring some flag `checkCompatibility` to the
session. At the first connection, we use session with
`checkCompatibility=true`. On next permanent connection, we might recreate
session with `checkCompatibility=false`. This flag is used in the message
serialization.
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java:
##########
@@ -163,63 +176,123 @@ void writeMessage(TcpDiscoveryAbstractMessage msg)
throws IgniteCheckedException
* @throws IgniteCheckedException If deserialization fails.
*/
<T> T readMessage() throws IgniteCheckedException, IOException {
- byte serMode = (byte)in.read();
+ return reader.readMessage();
+ }
- if (JAVA_SERIALIZATION == serMode)
- return U.unmarshal(spi.marshaller(), in, clsLdr);
+ /** Switches reader to fast mode after initial validated read. */
+ public void switchToFastReader() {
+ reader = fastReader;
+ }
- try {
- if (MESSAGE_SERIALIZATION != serMode) {
- detectSslAlert(serMode, in);
+ /** Base discovery message reader. */
+ private abstract class DiscoveryMessageReader {
+ /**
+ * Reads the next discovery message from the socket input stream.
+ *
+ * @param <T> Type of the expected message.
+ * @return Deserialized message instance.
+ * @throws IgniteCheckedException If deserialization fails.
+ */
+ abstract <T> T readMessage() throws IgniteCheckedException,
IOException;
- // IOException type is important for ServerImpl. It may search
the cause (X.hasCause).
- // The connection error processing behavior depends on it.
- throw new IOException("Received unexpected byte while reading
discovery message: " + serMode);
- }
-
- Message msg =
spi.messageFactory().create(makeMessageType((byte)in.read(), (byte)in.read()));
+ /** Reads serialized discovery message body when {@link
#MESSAGE_SERIALIZATION} is used. */
+ protected <T> T readDiscoveryMessageBody() throws
IgniteCheckedException {
+ try {
+ Message msg =
spi.messageFactory().create(makeMessageType((byte)in.read(), (byte)in.read()));
- msgReader.reset();
- msgReader.setBuffer(msgBuf);
+ msgReader.reset();
+ msgReader.setBuffer(msgBuf);
- MessageSerializer msgSer =
spi.messageFactory().serializer(msg.directType());
+ MessageSerializer msgSer =
spi.messageFactory().serializer(msg.directType());
- boolean finished;
+ boolean finished;
- do {
- msgBuf.clear();
+ do {
+ msgBuf.clear();
- int read = in.read(msgBuf.array(), msgBuf.position(),
msgBuf.remaining());
+ int read = in.read(msgBuf.array(), msgBuf.position(),
msgBuf.remaining());
- if (read == -1)
- throw new EOFException("Connection closed before message
was fully read.");
+ if (read == -1)
+ throw new EOFException("Connection closed before
message was fully read.");
- msgBuf.limit(read);
+ msgBuf.limit(read);
- finished = msgSer.readFrom(msg, msgReader);
+ finished = msgSer.readFrom(msg, msgReader);
- // Server Discovery only sends next message to next Server
upon receiving a receipt for the previous one.
- // This behaviour guarantees that we never read a next message
from the buffer right after the end of
- // the previous message. But it is not guaranteed with Client
Discovery where messages aren't acknowledged.
- // Thus, we have to keep the uprocessed bytes read from the
socket. It won't return them again.
- if (msgBuf.hasRemaining()) {
- byte[] unprocessedReadTail = new byte[msgBuf.remaining()];
+ // Server Discovery only sends next message to next Server
upon receiving a receipt for the previous one.
+ // This behaviour guarantees that we never read a next
message from the buffer right after the end of
+ // the previous message. But it is not guaranteed with
Client Discovery where messages aren't acknowledged.
+ // Thus, we have to keep the uprocessed bytes read from
the socket. It won't return them again.
+ if (msgBuf.hasRemaining()) {
+ byte[] unprocessedReadTail = new
byte[msgBuf.remaining()];
- msgBuf.get(unprocessedReadTail, 0, msgBuf.remaining());
+ msgBuf.get(unprocessedReadTail, 0, msgBuf.remaining());
- in.attachByteArray(unprocessedReadTail);
+ in.attachByteArray(unprocessedReadTail);
+ }
}
+ while (!finished);
+
+ return (T)msg;
}
- while (!finished);
+ catch (Exception e) {
+ // Keep logic similar to `U.marshal(...)`.
+ if (e instanceof IgniteCheckedException)
+ throw (IgniteCheckedException)e;
- return (T)msg;
+ throw new IgniteCheckedException(e);
+ }
}
- catch (Exception e) {
- // Keep logic similar to `U.marshal(...)`.
- if (e instanceof IgniteCheckedException)
- throw (IgniteCheckedException)e;
- throw new IgniteCheckedException(e);
+ /** Reads 4-byte header for diagnostics. */
+ protected byte[] readHeader(byte serMode) throws IOException {
Review Comment:
Is colled only from a derived class.
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java:
##########
@@ -163,63 +176,123 @@ void writeMessage(TcpDiscoveryAbstractMessage msg)
throws IgniteCheckedException
* @throws IgniteCheckedException If deserialization fails.
*/
<T> T readMessage() throws IgniteCheckedException, IOException {
- byte serMode = (byte)in.read();
+ return reader.readMessage();
+ }
- if (JAVA_SERIALIZATION == serMode)
- return U.unmarshal(spi.marshaller(), in, clsLdr);
+ /** Switches reader to fast mode after initial validated read. */
+ public void switchToFastReader() {
+ reader = fastReader;
+ }
- try {
- if (MESSAGE_SERIALIZATION != serMode) {
- detectSslAlert(serMode, in);
+ /** Base discovery message reader. */
+ private abstract class DiscoveryMessageReader {
Review Comment:
To me, looks as overhead. Too much new code, more complication. We alread
have many `if`s, `switch`es in the message parsing. I belive, one more wouldn't
change the roitine. Instead new subclasses, we might bring to the session some
flag like `checkProtocol`. And just recreate session with/without it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]