rpuch commented on a change in pull request #523:
URL: https://github.com/apache/ignite-3/pull/523#discussion_r774903995



##########
File path: 
modules/network-api/src/main/java/org/apache/ignite/network/annotations/Marshallable.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.annotations;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation that should be placed on fields of the {@link Transferable} 
classes which store non-{@link Transferable} objects.
+ * This is useful for the user object serialization because we can't generate 
serializer and deserializer for user objects
+ * at the compile time.
+ */
+@Target(ElementType.METHOD)

Review comment:
       The javadoc says 'placed on fields', but here it is `METHOD`. Is 
everything ok?

##########
File path: 
modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageReader.java
##########
@@ -272,6 +272,8 @@
     public <M extends Map<?, ?>> M readMap(String name, 
MessageCollectionItemType keyType,
             MessageCollectionItemType valType, boolean linked);
 
+    <T> T readMarshallable(String name);

Review comment:
       Probably javadoc is forgotten

##########
File path: 
modules/network/src/main/java/org/apache/ignite/internal/network/serialization/ClassDescriptorFactoryContext.java
##########
@@ -144,4 +144,15 @@ public void addDescriptor(ClassDescriptor descriptor) {
 
         descriptorMap.put(realDescriptorId, descriptor);
     }
+
+    /**
+     * Returns {@code true} if descriptor with the specified descriptor id is 
built-in, {@code false} otherwise.
+     *
+     *
+     * @param descriptorId Descriptor id.
+     * @return Whether descriptor is built-in.
+     */
+    public static boolean isBuiltin(int descriptorId) {

Review comment:
       A minor nitpick: in most of the code (main `main` and another PR that is 
being currently reviewed) it's written as 'builtIn' (with capital 'I') and not 
'builtin'. Let's choose a consistent way to write the word :)

##########
File path: 
modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.serialization;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
+import org.apache.ignite.internal.network.message.FieldDescriptorMessage;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.serialization.MessageDeserializer;
+import org.apache.ignite.network.serialization.MessageSerializer;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Per-session serialization service.
+ * Handles (de-)serialization of messages, object (de-)serialization and class 
descriptor merging.
+ */
+public class PerSessionSerializationService {
+    /** Network messages factory. */
+    private static final NetworkMessagesFactory MSG_FACTORY = new 
NetworkMessagesFactory();
+
+    /** Global serialization service. */
+    @NotNull
+    private final SerializationService serializationService;
+
+    /**
+     * Map with merged class descriptors. They are the result of the merging 
of a local and a remote descriptor.
+     * The key in this map is a <b>remote</b> descriptor id.
+     */
+    private final ConcurrentMap<Integer, ClassDescriptor> mergedDescriptorMap 
= new ConcurrentHashMap<>();
+
+    /**
+     * Immutable view over {@link #mergedDescriptorMap}. Used by {@link 
#serializationService}.
+     */
+    private final Map<Integer, ClassDescriptor> descriptorMapView = 
Collections.unmodifiableMap(mergedDescriptorMap);
+
+    public PerSessionSerializationService(@NotNull SerializationService 
serializationService) {
+        this.serializationService = serializationService;
+    }
+
+    /**
+     * Creates a message serializer.
+     *
+     * @see SerializationService#createSerializer(short, short)
+     */
+    public <T extends NetworkMessage> MessageSerializer<T> 
createMessageSerializer(short groupType, short messageType) {
+        return serializationService.createSerializer(groupType, messageType);
+    }
+
+    /**
+     * Creates a message deserializer.
+     *
+     * @see SerializationService#createDeserializer(short, short)
+     */
+    public <T extends NetworkMessage> MessageDeserializer<T> 
createMessageDeserializer(short groupType, short messageType) {
+        return serializationService.createDeserializer(groupType, messageType);
+    }
+
+    /**
+     * Serializes a marshallable object to a byte array.
+     *
+     * @see SerializationService#writeMarshallable(Object)
+     */
+    public <T> SerializationResult writeMarshallable(T marshallable) {
+        return serializationService.writeMarshallable(marshallable);
+    }
+
+    /**
+     * Deserializes a marshallable object from a byte array.
+     *
+     * @see SerializationService#readMarshallable(Map, byte[])
+     */
+    public <T> T readMarshallable(List<ClassDescriptorMessage> 
missingDescriptors, byte[] marshallableData) {
+        mergeDescriptors(missingDescriptors);
+
+        return serializationService.readMarshallable(descriptorMapView, 
marshallableData);
+    }
+
+    /**
+     * Creates a list of messages holding class descriptors.
+     *
+     * @param descriptorIds Ids of class descriptors.
+     * @return List of class descriptor network messages.
+     */
+    public List<ClassDescriptorMessage> 
createClassDescriptorsMessages(List<Integer> descriptorIds) {
+        return descriptorIds.stream().map(id -> {
+            ClassDescriptor descriptor = 
serializationService.getClassDescriptor(id);
+
+            List<FieldDescriptorMessage> fields = descriptor.fields().stream()
+                    .map(d -> {
+                        return MSG_FACTORY.fieldDescriptorMessage()
+                                .name(d.name())
+                                .typeDescriptorId(d.typeDescriptorId())
+                                .className(d.clazz().getName())
+                                .build();
+                    })
+                    .collect(Collectors.toList());
+
+            Serialization serialization = descriptor.serialization();
+
+            return MSG_FACTORY.classDescriptorMessage()
+                    .fields(fields)
+                    .isFinal(descriptor.isFinal())
+                    .serializationType(serialization.type().value())
+                    
.hasSerializationOverride(serialization.hasSerializationOverride())
+                    .hasWriteReplace(serialization.hasWriteReplace())
+                    .hasReadResolve(serialization.hasReadResolve())
+                    .descriptorId(descriptor.descriptorId())
+                    .className(descriptor.className())
+                    .build();
+        }).collect(Collectors.toList());
+    }
+
+    private void mergeDescriptors(List<ClassDescriptorMessage> 
remoteDescriptors) {
+        for (ClassDescriptorMessage clsMsg : remoteDescriptors) {
+            int clsDescriptorId = clsMsg.descriptorId();
+
+            boolean isClsBuiltin = 
serializationService.isBuiltin(clsDescriptorId);
+
+            if (isClsBuiltin) {
+                continue;
+            }
+
+            ClassDescriptor mergedDescriptor = 
mergedDescriptorMap.get(clsDescriptorId);
+
+            if (mergedDescriptor != null) {
+                continue;
+            }
+
+            ClassDescriptor localDescriptor = 
serializationService.getClassDescriptor(clsMsg.className());
+
+            List<FieldDescriptor> fields = 
clsMsg.fields().stream().map(fieldMsg -> {
+                int typeDescriptorId = fieldMsg.typeDescriptorId();
+
+                return new FieldDescriptor(fieldMsg.name(), 
getClass(typeDescriptorId, fieldMsg.className()), typeDescriptorId);
+            }).collect(Collectors.toList());
+
+            SerializationType serializationType = 
SerializationType.getByValue(clsMsg.serializationType());
+            boolean hasSerializationOverride = 
clsMsg.hasSerializationOverride();

Review comment:
       These 3 variables seem to be redundant. 3 more vars to keep in mind when 
reading the code. How about inlining them?

##########
File path: 
modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.serialization;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.embedded.EmbeddedChannel;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.ignite.internal.network.direct.DirectMessageWriter;
+import org.apache.ignite.internal.network.netty.ConnectionManager;
+import org.apache.ignite.internal.network.netty.InboundDecoder;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
+import org.apache.ignite.network.TestMessagesFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.apache.ignite.network.serialization.MessageSerializer;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+/**
+ * Tests marshallable serialization.
+ */
+public class MarshallableTest {
+    /** {@link ByteBuf} allocator. */
+    private final UnpooledByteBufAllocator allocator = 
UnpooledByteBufAllocator.DEFAULT;
+
+    /** Registry. */
+    private final MessageSerializationRegistry registry = new 
TestMessageSerializationRegistryImpl();
+
+    /**
+     * Tests that marshallable object can be serialized along with it's 
descriptor.

Review comment:
       'it's' -> 'its'

##########
File path: 
modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
##########
@@ -1343,6 +1362,81 @@ public IgniteUuid readIgniteUuid() {
         return map0;
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public <T> void writeMarshallable(T object, MessageWriter writer) {
+        switch (marshallableState) {
+            case 0:

Review comment:
       How about introducing constants with clarifying names for these 0, 1?

##########
File path: 
modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageWriter.java
##########
@@ -340,4 +340,6 @@
      * Resets this writer.
      */
     public void reset();
+
+    <T> boolean writeMarshallable(String name, T object);

Review comment:
       Probably javadoc is forgotten

##########
File path: 
modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
##########
@@ -1343,6 +1362,81 @@ public IgniteUuid readIgniteUuid() {
         return map0;
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public <T> void writeMarshallable(T object, MessageWriter writer) {
+        switch (marshallableState) {
+            case 0:
+                if (marshallable == null) {
+                    // If object was not serialized to a byte array, serialize 
it
+                    SerializationResult res = 
serializationService.writeMarshallable(object);
+                    List<Integer> descriptorIds = res.ids();
+                    marshallable = res.array();
+                    // Get descriptors that were not previously sent to the 
remote node
+                    missingDescriptors = 
serializationService.createClassDescriptorsMessages(descriptorIds);
+                }
+
+                writeCollection(missingDescriptors, 
MessageCollectionItemType.MSG, writer);
+
+                if (!lastFinished) {
+                    return;
+                }
+
+                marshallableState++;
+
+                //noinspection fallthrough
+            case 1:
+                writeByteArray(marshallable);
+
+                if (!lastFinished) {
+                    return;
+                }
+
+                marshallable = null;
+                missingDescriptors = null;
+                marshallableState = 0;
+                break;
+
+            default:
+                throw new IllegalArgumentException("Unknown marshallableState: 
" + marshallableState);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <T> T readMarshallable(MessageReader reader) {
+        switch (marshallableState) {
+            case 0:
+                missingDescriptors = 
readCollection(MessageCollectionItemType.MSG, reader);

Review comment:
       Same field `missingDescriptors` is used both for writing (when it's the 
descriptors that were not yet sent to the peer) and for reading (when it's the 
descriptors that THIS node does not yet have).
   
   Would it be better to separate the fields to reduce confusion?

##########
File path: 
modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
##########
@@ -136,6 +142,12 @@
 
     private long uuidLocId;
 
+    private int marshallableState;
+
+    private byte[] marshallable;
+
+    private List<ClassDescriptorMessage> missingDescriptors;

Review comment:
       Right now, it is not 100% clear what these `missingDescriptors` mean. 
How about renaming it so that it becomes clear that these are descriptor 
messages that we did not sent yet but going to send because they will be needed 
by the other side? Like `newDescriptorsToSend`?

##########
File path: 
modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.serialization;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
+import org.apache.ignite.internal.network.message.FieldDescriptorMessage;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.serialization.MessageDeserializer;
+import org.apache.ignite.network.serialization.MessageSerializer;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Per-session serialization service.
+ * Handles (de-)serialization of messages, object (de-)serialization and class 
descriptor merging.
+ */
+public class PerSessionSerializationService {
+    /** Network messages factory. */
+    private static final NetworkMessagesFactory MSG_FACTORY = new 
NetworkMessagesFactory();
+
+    /** Global serialization service. */
+    @NotNull
+    private final SerializationService serializationService;
+
+    /**
+     * Map with merged class descriptors. They are the result of the merging 
of a local and a remote descriptor.
+     * The key in this map is a <b>remote</b> descriptor id.
+     */
+    private final ConcurrentMap<Integer, ClassDescriptor> mergedDescriptorMap 
= new ConcurrentHashMap<>();
+
+    /**
+     * Immutable view over {@link #mergedDescriptorMap}. Used by {@link 
#serializationService}.
+     */
+    private final Map<Integer, ClassDescriptor> descriptorMapView = 
Collections.unmodifiableMap(mergedDescriptorMap);
+
+    public PerSessionSerializationService(@NotNull SerializationService 
serializationService) {
+        this.serializationService = serializationService;
+    }
+
+    /**
+     * Creates a message serializer.
+     *
+     * @see SerializationService#createSerializer(short, short)
+     */
+    public <T extends NetworkMessage> MessageSerializer<T> 
createMessageSerializer(short groupType, short messageType) {
+        return serializationService.createSerializer(groupType, messageType);
+    }
+
+    /**
+     * Creates a message deserializer.
+     *
+     * @see SerializationService#createDeserializer(short, short)
+     */
+    public <T extends NetworkMessage> MessageDeserializer<T> 
createMessageDeserializer(short groupType, short messageType) {
+        return serializationService.createDeserializer(groupType, messageType);
+    }
+
+    /**
+     * Serializes a marshallable object to a byte array.
+     *
+     * @see SerializationService#writeMarshallable(Object)
+     */
+    public <T> SerializationResult writeMarshallable(T marshallable) {
+        return serializationService.writeMarshallable(marshallable);
+    }
+
+    /**
+     * Deserializes a marshallable object from a byte array.
+     *
+     * @see SerializationService#readMarshallable(Map, byte[])
+     */
+    public <T> T readMarshallable(List<ClassDescriptorMessage> 
missingDescriptors, byte[] marshallableData) {
+        mergeDescriptors(missingDescriptors);
+
+        return serializationService.readMarshallable(descriptorMapView, 
marshallableData);
+    }
+
+    /**
+     * Creates a list of messages holding class descriptors.
+     *
+     * @param descriptorIds Ids of class descriptors.
+     * @return List of class descriptor network messages.
+     */
+    public List<ClassDescriptorMessage> 
createClassDescriptorsMessages(List<Integer> descriptorIds) {
+        return descriptorIds.stream().map(id -> {
+            ClassDescriptor descriptor = 
serializationService.getClassDescriptor(id);
+
+            List<FieldDescriptorMessage> fields = descriptor.fields().stream()
+                    .map(d -> {
+                        return MSG_FACTORY.fieldDescriptorMessage()
+                                .name(d.name())
+                                .typeDescriptorId(d.typeDescriptorId())
+                                .className(d.clazz().getName())
+                                .build();
+                    })
+                    .collect(Collectors.toList());
+
+            Serialization serialization = descriptor.serialization();
+
+            return MSG_FACTORY.classDescriptorMessage()
+                    .fields(fields)
+                    .isFinal(descriptor.isFinal())
+                    .serializationType(serialization.type().value())
+                    
.hasSerializationOverride(serialization.hasSerializationOverride())
+                    .hasWriteReplace(serialization.hasWriteReplace())
+                    .hasReadResolve(serialization.hasReadResolve())
+                    .descriptorId(descriptor.descriptorId())
+                    .className(descriptor.className())
+                    .build();
+        }).collect(Collectors.toList());
+    }
+
+    private void mergeDescriptors(List<ClassDescriptorMessage> 
remoteDescriptors) {
+        for (ClassDescriptorMessage clsMsg : remoteDescriptors) {
+            int clsDescriptorId = clsMsg.descriptorId();
+
+            boolean isClsBuiltin = 
serializationService.isBuiltin(clsDescriptorId);
+
+            if (isClsBuiltin) {
+                continue;
+            }
+
+            ClassDescriptor mergedDescriptor = 
mergedDescriptorMap.get(clsDescriptorId);
+
+            if (mergedDescriptor != null) {

Review comment:
       Shouldn't `mergedDescriptorMap.computeIfAbsent(...)` be used here?

##########
File path: 
modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.serialization;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
+import org.apache.ignite.internal.network.message.FieldDescriptorMessage;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.serialization.MessageDeserializer;
+import org.apache.ignite.network.serialization.MessageSerializer;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Per-session serialization service.
+ * Handles (de-)serialization of messages, object (de-)serialization and class 
descriptor merging.
+ */
+public class PerSessionSerializationService {
+    /** Network messages factory. */
+    private static final NetworkMessagesFactory MSG_FACTORY = new 
NetworkMessagesFactory();
+
+    /** Global serialization service. */
+    @NotNull
+    private final SerializationService serializationService;
+
+    /**
+     * Map with merged class descriptors. They are the result of the merging 
of a local and a remote descriptor.
+     * The key in this map is a <b>remote</b> descriptor id.
+     */
+    private final ConcurrentMap<Integer, ClassDescriptor> mergedDescriptorMap 
= new ConcurrentHashMap<>();
+
+    /**
+     * Immutable view over {@link #mergedDescriptorMap}. Used by {@link 
#serializationService}.
+     */
+    private final Map<Integer, ClassDescriptor> descriptorMapView = 
Collections.unmodifiableMap(mergedDescriptorMap);
+
+    public PerSessionSerializationService(@NotNull SerializationService 
serializationService) {
+        this.serializationService = serializationService;
+    }
+
+    /**
+     * Creates a message serializer.
+     *
+     * @see SerializationService#createSerializer(short, short)
+     */
+    public <T extends NetworkMessage> MessageSerializer<T> 
createMessageSerializer(short groupType, short messageType) {
+        return serializationService.createSerializer(groupType, messageType);
+    }
+
+    /**
+     * Creates a message deserializer.
+     *
+     * @see SerializationService#createDeserializer(short, short)
+     */
+    public <T extends NetworkMessage> MessageDeserializer<T> 
createMessageDeserializer(short groupType, short messageType) {
+        return serializationService.createDeserializer(groupType, messageType);
+    }
+
+    /**
+     * Serializes a marshallable object to a byte array.
+     *
+     * @see SerializationService#writeMarshallable(Object)
+     */
+    public <T> SerializationResult writeMarshallable(T marshallable) {
+        return serializationService.writeMarshallable(marshallable);
+    }
+
+    /**
+     * Deserializes a marshallable object from a byte array.
+     *
+     * @see SerializationService#readMarshallable(Map, byte[])
+     */
+    public <T> T readMarshallable(List<ClassDescriptorMessage> 
missingDescriptors, byte[] marshallableData) {
+        mergeDescriptors(missingDescriptors);
+
+        return serializationService.readMarshallable(descriptorMapView, 
marshallableData);
+    }
+
+    /**
+     * Creates a list of messages holding class descriptors.
+     *
+     * @param descriptorIds Ids of class descriptors.
+     * @return List of class descriptor network messages.
+     */
+    public List<ClassDescriptorMessage> 
createClassDescriptorsMessages(List<Integer> descriptorIds) {
+        return descriptorIds.stream().map(id -> {
+            ClassDescriptor descriptor = 
serializationService.getClassDescriptor(id);
+
+            List<FieldDescriptorMessage> fields = descriptor.fields().stream()
+                    .map(d -> {
+                        return MSG_FACTORY.fieldDescriptorMessage()
+                                .name(d.name())
+                                .typeDescriptorId(d.typeDescriptorId())
+                                .className(d.clazz().getName())
+                                .build();
+                    })
+                    .collect(Collectors.toList());
+
+            Serialization serialization = descriptor.serialization();
+
+            return MSG_FACTORY.classDescriptorMessage()
+                    .fields(fields)
+                    .isFinal(descriptor.isFinal())
+                    .serializationType(serialization.type().value())
+                    
.hasSerializationOverride(serialization.hasSerializationOverride())
+                    .hasWriteReplace(serialization.hasWriteReplace())
+                    .hasReadResolve(serialization.hasReadResolve())
+                    .descriptorId(descriptor.descriptorId())
+                    .className(descriptor.className())
+                    .build();
+        }).collect(Collectors.toList());

Review comment:
       A little suggestion: if `toList()` is imported statically, then 
`.collect(toList())` reads just as plain English which seems cool for 
readability. At the same time, everyone knows that `toList()` (in the context 
of collectors) always comes from `Collections`, so this seems to be a redundant 
information anyway.

##########
File path: 
modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
##########
@@ -144,10 +156,12 @@
     /**
      * Constructor.
      *
-     * @param serializationRegistry Message mappers.
+     * @param serializationService Serialization service.       .
      */
-    public DirectByteBufferStreamImplV1(MessageSerializationRegistry 
serializationRegistry) {
-        this.serializationRegistry = serializationRegistry;
+    public DirectByteBufferStreamImplV1(
+            PerSessionSerializationService serializationService

Review comment:
       Are these line wraps really needed? Everything looks to be fitting just 
one line.

##########
File path: 
modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.serialization;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
+import org.apache.ignite.internal.network.message.FieldDescriptorMessage;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.serialization.MessageDeserializer;
+import org.apache.ignite.network.serialization.MessageSerializer;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Per-session serialization service.
+ * Handles (de-)serialization of messages, object (de-)serialization and class 
descriptor merging.
+ */
+public class PerSessionSerializationService {
+    /** Network messages factory. */
+    private static final NetworkMessagesFactory MSG_FACTORY = new 
NetworkMessagesFactory();
+
+    /** Global serialization service. */
+    @NotNull
+    private final SerializationService serializationService;
+
+    /**
+     * Map with merged class descriptors. They are the result of the merging 
of a local and a remote descriptor.
+     * The key in this map is a <b>remote</b> descriptor id.
+     */
+    private final ConcurrentMap<Integer, ClassDescriptor> mergedDescriptorMap 
= new ConcurrentHashMap<>();
+
+    /**
+     * Immutable view over {@link #mergedDescriptorMap}. Used by {@link 
#serializationService}.
+     */
+    private final Map<Integer, ClassDescriptor> descriptorMapView = 
Collections.unmodifiableMap(mergedDescriptorMap);
+
+    public PerSessionSerializationService(@NotNull SerializationService 
serializationService) {
+        this.serializationService = serializationService;
+    }
+
+    /**
+     * Creates a message serializer.
+     *
+     * @see SerializationService#createSerializer(short, short)
+     */
+    public <T extends NetworkMessage> MessageSerializer<T> 
createMessageSerializer(short groupType, short messageType) {
+        return serializationService.createSerializer(groupType, messageType);
+    }
+
+    /**
+     * Creates a message deserializer.
+     *
+     * @see SerializationService#createDeserializer(short, short)
+     */
+    public <T extends NetworkMessage> MessageDeserializer<T> 
createMessageDeserializer(short groupType, short messageType) {
+        return serializationService.createDeserializer(groupType, messageType);
+    }
+
+    /**
+     * Serializes a marshallable object to a byte array.
+     *
+     * @see SerializationService#writeMarshallable(Object)
+     */
+    public <T> SerializationResult writeMarshallable(T marshallable) {
+        return serializationService.writeMarshallable(marshallable);
+    }
+
+    /**
+     * Deserializes a marshallable object from a byte array.
+     *
+     * @see SerializationService#readMarshallable(Map, byte[])
+     */
+    public <T> T readMarshallable(List<ClassDescriptorMessage> 
missingDescriptors, byte[] marshallableData) {
+        mergeDescriptors(missingDescriptors);
+
+        return serializationService.readMarshallable(descriptorMapView, 
marshallableData);
+    }
+
+    /**
+     * Creates a list of messages holding class descriptors.
+     *
+     * @param descriptorIds Ids of class descriptors.
+     * @return List of class descriptor network messages.
+     */
+    public List<ClassDescriptorMessage> 
createClassDescriptorsMessages(List<Integer> descriptorIds) {
+        return descriptorIds.stream().map(id -> {
+            ClassDescriptor descriptor = 
serializationService.getClassDescriptor(id);
+
+            List<FieldDescriptorMessage> fields = descriptor.fields().stream()
+                    .map(d -> {
+                        return MSG_FACTORY.fieldDescriptorMessage()
+                                .name(d.name())
+                                .typeDescriptorId(d.typeDescriptorId())
+                                .className(d.clazz().getName())
+                                .build();
+                    })
+                    .collect(Collectors.toList());
+
+            Serialization serialization = descriptor.serialization();
+
+            return MSG_FACTORY.classDescriptorMessage()
+                    .fields(fields)
+                    .isFinal(descriptor.isFinal())
+                    .serializationType(serialization.type().value())
+                    
.hasSerializationOverride(serialization.hasSerializationOverride())
+                    .hasWriteReplace(serialization.hasWriteReplace())
+                    .hasReadResolve(serialization.hasReadResolve())
+                    .descriptorId(descriptor.descriptorId())
+                    .className(descriptor.className())
+                    .build();
+        }).collect(Collectors.toList());
+    }
+
+    private void mergeDescriptors(List<ClassDescriptorMessage> 
remoteDescriptors) {
+        for (ClassDescriptorMessage clsMsg : remoteDescriptors) {
+            int clsDescriptorId = clsMsg.descriptorId();
+
+            boolean isClsBuiltin = 
serializationService.isBuiltin(clsDescriptorId);
+
+            if (isClsBuiltin) {
+                continue;
+            }
+
+            ClassDescriptor mergedDescriptor = 
mergedDescriptorMap.get(clsDescriptorId);
+
+            if (mergedDescriptor != null) {
+                continue;
+            }
+
+            ClassDescriptor localDescriptor = 
serializationService.getClassDescriptor(clsMsg.className());
+
+            List<FieldDescriptor> fields = 
clsMsg.fields().stream().map(fieldMsg -> {
+                int typeDescriptorId = fieldMsg.typeDescriptorId();
+
+                return new FieldDescriptor(fieldMsg.name(), 
getClass(typeDescriptorId, fieldMsg.className()), typeDescriptorId);
+            }).collect(Collectors.toList());
+
+            SerializationType serializationType = 
SerializationType.getByValue(clsMsg.serializationType());
+            boolean hasSerializationOverride = 
clsMsg.hasSerializationOverride();
+            boolean hasWriteReplace = clsMsg.hasWriteReplace();
+            boolean hasReadResolve = clsMsg.hasReadResolve();
+
+            var serialization = new Serialization(serializationType, 
hasSerializationOverride, hasWriteReplace, hasReadResolve);
+
+            ClassDescriptor remoteDescriptor = new ClassDescriptor(

Review comment:
       I suggest extracting the 'message to descriptor' conversion code to a 
method of its own (like `descriptorFromMessage()` so that it does not clutter 
this method (which already has a fair amount of logic).

##########
File path: 
modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.serialization;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
+import org.apache.ignite.internal.network.message.FieldDescriptorMessage;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.serialization.MessageDeserializer;
+import org.apache.ignite.network.serialization.MessageSerializer;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Per-session serialization service.
+ * Handles (de-)serialization of messages, object (de-)serialization and class 
descriptor merging.
+ */
+public class PerSessionSerializationService {
+    /** Network messages factory. */
+    private static final NetworkMessagesFactory MSG_FACTORY = new 
NetworkMessagesFactory();
+
+    /** Global serialization service. */
+    @NotNull
+    private final SerializationService serializationService;
+
+    /**
+     * Map with merged class descriptors. They are the result of the merging 
of a local and a remote descriptor.
+     * The key in this map is a <b>remote</b> descriptor id.
+     */
+    private final ConcurrentMap<Integer, ClassDescriptor> mergedDescriptorMap 
= new ConcurrentHashMap<>();
+
+    /**
+     * Immutable view over {@link #mergedDescriptorMap}. Used by {@link 
#serializationService}.
+     */
+    private final Map<Integer, ClassDescriptor> descriptorMapView = 
Collections.unmodifiableMap(mergedDescriptorMap);
+
+    public PerSessionSerializationService(@NotNull SerializationService 
serializationService) {
+        this.serializationService = serializationService;
+    }
+
+    /**
+     * Creates a message serializer.
+     *
+     * @see SerializationService#createSerializer(short, short)
+     */
+    public <T extends NetworkMessage> MessageSerializer<T> 
createMessageSerializer(short groupType, short messageType) {
+        return serializationService.createSerializer(groupType, messageType);
+    }
+
+    /**
+     * Creates a message deserializer.
+     *
+     * @see SerializationService#createDeserializer(short, short)
+     */
+    public <T extends NetworkMessage> MessageDeserializer<T> 
createMessageDeserializer(short groupType, short messageType) {
+        return serializationService.createDeserializer(groupType, messageType);
+    }
+
+    /**
+     * Serializes a marshallable object to a byte array.
+     *
+     * @see SerializationService#writeMarshallable(Object)
+     */
+    public <T> SerializationResult writeMarshallable(T marshallable) {
+        return serializationService.writeMarshallable(marshallable);
+    }
+
+    /**
+     * Deserializes a marshallable object from a byte array.
+     *
+     * @see SerializationService#readMarshallable(Map, byte[])
+     */
+    public <T> T readMarshallable(List<ClassDescriptorMessage> 
missingDescriptors, byte[] marshallableData) {
+        mergeDescriptors(missingDescriptors);
+
+        return serializationService.readMarshallable(descriptorMapView, 
marshallableData);
+    }
+
+    /**
+     * Creates a list of messages holding class descriptors.
+     *
+     * @param descriptorIds Ids of class descriptors.
+     * @return List of class descriptor network messages.
+     */
+    public List<ClassDescriptorMessage> 
createClassDescriptorsMessages(List<Integer> descriptorIds) {
+        return descriptorIds.stream().map(id -> {
+            ClassDescriptor descriptor = 
serializationService.getClassDescriptor(id);
+
+            List<FieldDescriptorMessage> fields = descriptor.fields().stream()
+                    .map(d -> {
+                        return MSG_FACTORY.fieldDescriptorMessage()
+                                .name(d.name())
+                                .typeDescriptorId(d.typeDescriptorId())
+                                .className(d.clazz().getName())
+                                .build();
+                    })
+                    .collect(Collectors.toList());
+
+            Serialization serialization = descriptor.serialization();
+
+            return MSG_FACTORY.classDescriptorMessage()
+                    .fields(fields)
+                    .isFinal(descriptor.isFinal())
+                    .serializationType(serialization.type().value())
+                    
.hasSerializationOverride(serialization.hasSerializationOverride())
+                    .hasWriteReplace(serialization.hasWriteReplace())
+                    .hasReadResolve(serialization.hasReadResolve())
+                    .descriptorId(descriptor.descriptorId())
+                    .className(descriptor.className())
+                    .build();
+        }).collect(Collectors.toList());
+    }
+
+    private void mergeDescriptors(List<ClassDescriptorMessage> 
remoteDescriptors) {
+        for (ClassDescriptorMessage clsMsg : remoteDescriptors) {
+            int clsDescriptorId = clsMsg.descriptorId();
+
+            boolean isClsBuiltin = 
serializationService.isBuiltin(clsDescriptorId);
+
+            if (isClsBuiltin) {
+                continue;
+            }
+
+            ClassDescriptor mergedDescriptor = 
mergedDescriptorMap.get(clsDescriptorId);
+
+            if (mergedDescriptor != null) {
+                continue;
+            }
+
+            ClassDescriptor localDescriptor = 
serializationService.getClassDescriptor(clsMsg.className());
+
+            List<FieldDescriptor> fields = 
clsMsg.fields().stream().map(fieldMsg -> {

Review comment:
       How about naming the variable `remoteFields` for clarity?

##########
File path: 
modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
##########
@@ -1343,6 +1362,81 @@ public IgniteUuid readIgniteUuid() {
         return map0;
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public <T> void writeMarshallable(T object, MessageWriter writer) {
+        switch (marshallableState) {
+            case 0:
+                if (marshallable == null) {
+                    // If object was not serialized to a byte array, serialize 
it
+                    SerializationResult res = 
serializationService.writeMarshallable(object);
+                    List<Integer> descriptorIds = res.ids();
+                    marshallable = res.array();
+                    // Get descriptors that were not previously sent to the 
remote node
+                    missingDescriptors = 
serializationService.createClassDescriptorsMessages(descriptorIds);
+                }
+
+                writeCollection(missingDescriptors, 
MessageCollectionItemType.MSG, writer);
+
+                if (!lastFinished) {

Review comment:
       I need a clarification on how this works. If `lastFinished` is false, 
will the write attempt be retried?

##########
File path: 
modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
##########
@@ -399,6 +400,7 @@ private ConnectionManager startManager(
             ClientStageFail clientHandshakeFailAt
     ) {
         var registry = new TestMessageSerializationRegistryImpl();
+        var serializationService = new SerializationService(registry, null);

Review comment:
       If the second argument is optional, then how about adding another 
constructor with just first argument? Also, in such a case it makes sense to 
annotate the second argument as `@Nullable`.
   
   Or it's just for tests for which we are sure that they will not use the 
second argument?

##########
File path: 
modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.serialization;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.embedded.EmbeddedChannel;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.ignite.internal.network.direct.DirectMessageWriter;
+import org.apache.ignite.internal.network.netty.ConnectionManager;
+import org.apache.ignite.internal.network.netty.InboundDecoder;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
+import org.apache.ignite.network.TestMessagesFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.apache.ignite.network.serialization.MessageSerializer;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+/**
+ * Tests marshallable serialization.
+ */
+public class MarshallableTest {
+    /** {@link ByteBuf} allocator. */
+    private final UnpooledByteBufAllocator allocator = 
UnpooledByteBufAllocator.DEFAULT;
+
+    /** Registry. */
+    private final MessageSerializationRegistry registry = new 
TestMessageSerializationRegistryImpl();
+
+    /**
+     * Tests that marshallable object can be serialized along with it's 
descriptor.
+     */
+    @Test
+    public void testMarshallable() {
+        TestMessagesFactory msgFactory = new TestMessagesFactory();
+
+        ChannelHandlerContext ctx = Mockito.mock(ChannelHandlerContext.class);

Review comment:
       Same as for `toList()` and `assertXXX()`: it seems reasonable to 
statically import all `Mockito` methods because everyone understands that, as 
we are in the context of tests, `mock()` means `Mockito.mock()` and `when()` 
comes from `Mockito`. At the same time, this reduces the noise and clutter in 
the code. Also, `doReturn(a).when(b).call()` reads as English prose, while 
`Mockito.doReturn(a).when(b).call()` certainly does not.

##########
File path: 
modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.serialization;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.embedded.EmbeddedChannel;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.ignite.internal.network.direct.DirectMessageWriter;
+import org.apache.ignite.internal.network.netty.ConnectionManager;
+import org.apache.ignite.internal.network.netty.InboundDecoder;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
+import org.apache.ignite.network.TestMessagesFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.apache.ignite.network.serialization.MessageSerializer;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+/**
+ * Tests marshallable serialization.
+ */
+public class MarshallableTest {
+    /** {@link ByteBuf} allocator. */
+    private final UnpooledByteBufAllocator allocator = 
UnpooledByteBufAllocator.DEFAULT;
+
+    /** Registry. */
+    private final MessageSerializationRegistry registry = new 
TestMessageSerializationRegistryImpl();
+
+    /**
+     * Tests that marshallable object can be serialized along with it's 
descriptor.
+     */
+    @Test
+    public void testMarshallable() {
+        TestMessagesFactory msgFactory = new TestMessagesFactory();
+
+        ChannelHandlerContext ctx = Mockito.mock(ChannelHandlerContext.class);
+
+        var channel = new EmbeddedChannel();
+
+        Mockito.doReturn(channel).when(ctx).channel();
+
+        ClassDescriptorFactoryContext descriptorContext = new 
ClassDescriptorFactoryContext();
+        ClassDescriptorFactory factory = new 
ClassDescriptorFactory(descriptorContext);
+
+        // Creates descriptor for SimpleSerializableObject
+        ClassDescriptor descriptor = 
factory.create(SimpleSerializableObject.class);
+
+        // Stub implementation of the serializer, uses standard JDK 
serializable serialization to actually marshall an object
+        UserObjectSerializer userObjectSerializer = new UserObjectSerializer() 
{
+            @Override
+            public <T> T read(Map<Integer, ClassDescriptor> descriptor, byte[] 
array) {
+                try (ByteArrayInputStream bais = new 
ByteArrayInputStream(array); ObjectInputStream ois = new 
ObjectInputStream(bais)) {
+                    return (T) ois.readObject();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            @Override
+            public <T> SerializationResult write(T object) {
+                try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+                    oos.writeObject(object);
+                    oos.close();
+                    return new SerializationResult(baos.toByteArray(), 
Collections.singletonList(descriptor.descriptorId()));
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            @Override
+            public ClassDescriptor getClassDescriptor(int typeDescriptorId) {
+                return descriptorContext.getDescriptor(typeDescriptorId);
+            }
+
+            @Override
+            public ClassDescriptor getClassDescriptor(String typeName) {
+                assertEquals(descriptor.className(), typeName);
+
+                return descriptor;
+            }
+
+            @Override
+            public boolean isBuiltin(int typeDescriptorId) {
+                return 
ClassDescriptorFactoryContext.isBuiltin(typeDescriptorId);
+            }
+        };
+
+        var serializationService = new SerializationService(registry, 
userObjectSerializer);
+        var perSessionSerializationService = new 
PerSessionSerializationService(serializationService);
+        final var decoder = new InboundDecoder(perSessionSerializationService);
+
+        // List that holds decoded object
+        final var list = new ArrayList<>();
+
+        var writer = new DirectMessageWriter(perSessionSerializationService, 
ConnectionManager.DIRECT_PROTOCOL_VERSION);
+
+        // Test map that will be sent as a Marshallable object within the 
MessageWithMarshallable message
+        Map<String, SimpleSerializableObject> testMap = Map.of("test", new 
SimpleSerializableObject(10));
+
+        MessageWithMarshallable msg = 
msgFactory.messageWithMarshallable().marshallableMap(testMap).build();
+
+        MessageSerializer<NetworkMessage> serializer = 
registry.createSerializer(msg.groupType(), msg.messageType());
+
+        ByteBuffer nioBuffer = ByteBuffer.allocate(10_000);
+
+        writer.setBuffer(nioBuffer);
+
+        // Write a message to the ByteBuffer.
+        boolean fullyWritten = serializer.writeMessage(msg, writer);
+
+        assertTrue(fullyWritten);
+
+        int size = nioBuffer.position();
+
+        nioBuffer.flip();
+
+        ByteBuf buffer = allocator.buffer();
+
+        for (int i = 0; i < size; i++) {
+            // Write bytes to a decoding buffer one by one
+            buffer.writeByte(nioBuffer.get());
+
+            decoder.decode(ctx, buffer, list);
+
+            if (i < size - 1) {
+                // Any time before the buffer is fully read, message object 
should not be decoded
+                assertEquals(0, list.size());
+            }
+        }
+
+        // Buffer is fully read, message object should be decoded
+        assertEquals(1, list.size());

Review comment:
       ```
   assertThat(list, hasSize(1));
   ```

##########
File path: 
modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.serialization;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.embedded.EmbeddedChannel;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.ignite.internal.network.direct.DirectMessageWriter;
+import org.apache.ignite.internal.network.netty.ConnectionManager;
+import org.apache.ignite.internal.network.netty.InboundDecoder;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
+import org.apache.ignite.network.TestMessagesFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.apache.ignite.network.serialization.MessageSerializer;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+/**
+ * Tests marshallable serialization.
+ */
+public class MarshallableTest {
+    /** {@link ByteBuf} allocator. */
+    private final UnpooledByteBufAllocator allocator = 
UnpooledByteBufAllocator.DEFAULT;
+
+    /** Registry. */
+    private final MessageSerializationRegistry registry = new 
TestMessageSerializationRegistryImpl();
+
+    /**
+     * Tests that marshallable object can be serialized along with it's 
descriptor.
+     */
+    @Test
+    public void testMarshallable() {
+        TestMessagesFactory msgFactory = new TestMessagesFactory();
+
+        ChannelHandlerContext ctx = Mockito.mock(ChannelHandlerContext.class);
+
+        var channel = new EmbeddedChannel();
+
+        Mockito.doReturn(channel).when(ctx).channel();
+
+        ClassDescriptorFactoryContext descriptorContext = new 
ClassDescriptorFactoryContext();
+        ClassDescriptorFactory factory = new 
ClassDescriptorFactory(descriptorContext);
+
+        // Creates descriptor for SimpleSerializableObject
+        ClassDescriptor descriptor = 
factory.create(SimpleSerializableObject.class);
+
+        // Stub implementation of the serializer, uses standard JDK 
serializable serialization to actually marshall an object
+        UserObjectSerializer userObjectSerializer = new UserObjectSerializer() 
{
+            @Override
+            public <T> T read(Map<Integer, ClassDescriptor> descriptor, byte[] 
array) {
+                try (ByteArrayInputStream bais = new 
ByteArrayInputStream(array); ObjectInputStream ois = new 
ObjectInputStream(bais)) {
+                    return (T) ois.readObject();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            @Override
+            public <T> SerializationResult write(T object) {
+                try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+                    oos.writeObject(object);
+                    oos.close();
+                    return new SerializationResult(baos.toByteArray(), 
Collections.singletonList(descriptor.descriptorId()));
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            @Override
+            public ClassDescriptor getClassDescriptor(int typeDescriptorId) {
+                return descriptorContext.getDescriptor(typeDescriptorId);
+            }
+
+            @Override
+            public ClassDescriptor getClassDescriptor(String typeName) {
+                assertEquals(descriptor.className(), typeName);
+
+                return descriptor;
+            }
+
+            @Override
+            public boolean isBuiltin(int typeDescriptorId) {
+                return 
ClassDescriptorFactoryContext.isBuiltin(typeDescriptorId);
+            }
+        };
+
+        var serializationService = new SerializationService(registry, 
userObjectSerializer);
+        var perSessionSerializationService = new 
PerSessionSerializationService(serializationService);
+        final var decoder = new InboundDecoder(perSessionSerializationService);
+
+        // List that holds decoded object
+        final var list = new ArrayList<>();
+
+        var writer = new DirectMessageWriter(perSessionSerializationService, 
ConnectionManager.DIRECT_PROTOCOL_VERSION);
+
+        // Test map that will be sent as a Marshallable object within the 
MessageWithMarshallable message
+        Map<String, SimpleSerializableObject> testMap = Map.of("test", new 
SimpleSerializableObject(10));
+
+        MessageWithMarshallable msg = 
msgFactory.messageWithMarshallable().marshallableMap(testMap).build();
+
+        MessageSerializer<NetworkMessage> serializer = 
registry.createSerializer(msg.groupType(), msg.messageType());
+
+        ByteBuffer nioBuffer = ByteBuffer.allocate(10_000);
+
+        writer.setBuffer(nioBuffer);
+
+        // Write a message to the ByteBuffer.
+        boolean fullyWritten = serializer.writeMessage(msg, writer);
+
+        assertTrue(fullyWritten);
+
+        int size = nioBuffer.position();
+
+        nioBuffer.flip();
+
+        ByteBuf buffer = allocator.buffer();
+
+        for (int i = 0; i < size; i++) {
+            // Write bytes to a decoding buffer one by one
+            buffer.writeByte(nioBuffer.get());
+
+            decoder.decode(ctx, buffer, list);
+
+            if (i < size - 1) {
+                // Any time before the buffer is fully read, message object 
should not be decoded
+                assertEquals(0, list.size());

Review comment:
       This can be replaced with (arguably more readable)
   
   ```
   assertThat(list, is(empty()));
   ```
   
   Plain English prose instead of a techy expression again :)

##########
File path: 
modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.serialization;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.embedded.EmbeddedChannel;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.ignite.internal.network.direct.DirectMessageWriter;
+import org.apache.ignite.internal.network.netty.ConnectionManager;
+import org.apache.ignite.internal.network.netty.InboundDecoder;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
+import org.apache.ignite.network.TestMessagesFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.apache.ignite.network.serialization.MessageSerializer;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+/**
+ * Tests marshallable serialization.
+ */
+public class MarshallableTest {
+    /** {@link ByteBuf} allocator. */
+    private final UnpooledByteBufAllocator allocator = 
UnpooledByteBufAllocator.DEFAULT;
+
+    /** Registry. */
+    private final MessageSerializationRegistry registry = new 
TestMessageSerializationRegistryImpl();
+
+    /**
+     * Tests that marshallable object can be serialized along with it's 
descriptor.
+     */
+    @Test
+    public void testMarshallable() {
+        TestMessagesFactory msgFactory = new TestMessagesFactory();
+
+        ChannelHandlerContext ctx = Mockito.mock(ChannelHandlerContext.class);
+
+        var channel = new EmbeddedChannel();
+
+        Mockito.doReturn(channel).when(ctx).channel();
+
+        ClassDescriptorFactoryContext descriptorContext = new 
ClassDescriptorFactoryContext();
+        ClassDescriptorFactory factory = new 
ClassDescriptorFactory(descriptorContext);
+
+        // Creates descriptor for SimpleSerializableObject
+        ClassDescriptor descriptor = 
factory.create(SimpleSerializableObject.class);
+
+        // Stub implementation of the serializer, uses standard JDK 
serializable serialization to actually marshall an object
+        UserObjectSerializer userObjectSerializer = new UserObjectSerializer() 
{
+            @Override
+            public <T> T read(Map<Integer, ClassDescriptor> descriptor, byte[] 
array) {
+                try (ByteArrayInputStream bais = new 
ByteArrayInputStream(array); ObjectInputStream ois = new 
ObjectInputStream(bais)) {
+                    return (T) ois.readObject();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            @Override
+            public <T> SerializationResult write(T object) {
+                try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+                    oos.writeObject(object);
+                    oos.close();
+                    return new SerializationResult(baos.toByteArray(), 
Collections.singletonList(descriptor.descriptorId()));
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            @Override
+            public ClassDescriptor getClassDescriptor(int typeDescriptorId) {
+                return descriptorContext.getDescriptor(typeDescriptorId);
+            }
+
+            @Override
+            public ClassDescriptor getClassDescriptor(String typeName) {
+                assertEquals(descriptor.className(), typeName);
+
+                return descriptor;
+            }
+
+            @Override
+            public boolean isBuiltin(int typeDescriptorId) {
+                return 
ClassDescriptorFactoryContext.isBuiltin(typeDescriptorId);
+            }
+        };
+
+        var serializationService = new SerializationService(registry, 
userObjectSerializer);
+        var perSessionSerializationService = new 
PerSessionSerializationService(serializationService);
+        final var decoder = new InboundDecoder(perSessionSerializationService);
+
+        // List that holds decoded object
+        final var list = new ArrayList<>();
+
+        var writer = new DirectMessageWriter(perSessionSerializationService, 
ConnectionManager.DIRECT_PROTOCOL_VERSION);

Review comment:
       I suggest extracting different stages of the test to methods of their 
own, so that the high-level structure of the test (the script in large blocks) 
becomes easily visible. For example, methods for writing a message, for reading 
a message, and so on could be extracted.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to