sanpwc commented on code in PR #927:
URL: https://github.com/apache/ignite-3/pull/927#discussion_r920887756


##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import org.apache.ignite.internal.replicator.message.ReadOnlyMultyEntryRequest;
+import org.apache.ignite.internal.replicator.message.ReadOnlyRequest;
+import 
org.apache.ignite.internal.replicator.message.ReadOnlySingleEntryRequest;
+import 
org.apache.ignite.internal.replicator.message.ReadWriteMultiEntryRequest;
+import org.apache.ignite.internal.replicator.message.ReadWriteRequest;
+import 
org.apache.ignite.internal.replicator.message.ReadWriteSingleEntryRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import 
org.apache.ignite.internal.replicator.message.container.MultiEntryContainer;
+import 
org.apache.ignite.internal.replicator.message.container.SingleEntryContainer;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.TopologyService;
+
+/**
+ * Replica server.
+ * TODO:IGNITE-17257 Implement Replica server-side logic.
+ */
+public class Replica implements AutoCloseable {
+    /** Replicator network message factory. */
+    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
+
+    /** Replica group identity, this id is the same as the considered 
partition's id. */
+    private final String partitionId;
+
+    /** Local cluster node. */
+    private final ClusterNode localNode;
+
+    /** Replica listener. */
+    private final ReplicaListener listener;
+
+    /**
+     * The constructor of replica server.
+     *
+     * @param partitionId Identity.
+     * @param topologyService Topology service.
+     * @param listener Replica listener.
+     */
+    public Replica(
+            String partitionId,
+            TopologyService topologyService,
+            ReplicaListener listener
+    ) {
+        this.partitionId = partitionId;
+        this.localNode = topologyService.localMember();
+        this.listener = listener;
+    }
+
+    /**
+     * It is an internal method to applies a RW request to replicator.
+     *
+     * @param req Request to apply in replicator.
+     * @return Response.
+     */
+    private ReplicaResponse applyReadWriteCommandInternal(ReadWriteRequest 
req) {
+        //TODO: Check locks here.
+        if (req instanceof ReadWriteSingleEntryRequest) {
+            var request = (ReadWriteSingleEntryRequest) req;
+
+            var data = (SingleEntryContainer) 
listener.applyReadWrite(req.txId(), request.entryContainer(), req.opType());
+
+            return REPLICA_MESSAGES_FACTORY
+                    .singleEntryResponse()
+                    .partitionId(partitionId)
+                    .entryContainer(data)
+                    .build();
+        } else if (req instanceof ReadWriteMultiEntryRequest) {
+            var request = (ReadWriteMultiEntryRequest) req;
+
+            var data = (MultiEntryContainer) 
listener.applyReadWrite(req.txId(), request.entryContainer(), req.opType());
+
+            return REPLICA_MESSAGES_FACTORY
+                    .multiEntryResponse()
+                    .partitionId(partitionId)
+                    .entryContainer(data)
+                    .build();
+        } else {
+            throw new IgniteInternalException(
+                    IgniteStringFormatter.format("Unsupported request type 
[type={}]", req.getClass().getSimpleName()));
+        }
+    }
+
+    /**
+     * It is an internal method to applies a RO request to replicator.
+     *
+     * @param req Request to apply in replicator.
+     * @return Response.
+     */
+    private ReplicaResponse applyReadOnlyCommandInternal(ReadOnlyRequest req) {
+        //TODO: Check locks here.
+        if (req instanceof ReadOnlySingleEntryRequest) {
+            var request = (ReadOnlySingleEntryRequest) req;
+
+            var data = (SingleEntryContainer) 
listener.applyReadOnly(req.timestamp(), request.entryContainer(), req.opType());
+
+            return REPLICA_MESSAGES_FACTORY
+                    .singleEntryResponse()
+                    .partitionId(partitionId)
+                    .entryContainer(data)
+                    .build();
+        } else if (req instanceof ReadOnlyMultyEntryRequest) {
+            var request = (ReadOnlyMultyEntryRequest) req;
+
+            var data = (MultiEntryContainer) 
listener.applyReadOnly(req.timestamp(), request.entryContainer(), req.opType());
+
+            return REPLICA_MESSAGES_FACTORY
+                    .multiEntryResponse()
+                    .partitionId(partitionId)
+                    .entryContainer(data)
+                    .build();
+        } else {
+            throw new IgniteInternalException(
+                    IgniteStringFormatter.format("Unsupported request type 
[type={}]", req.getClass().getSimpleName()));
+        }
+    }
+
+    /**
+     * Process a replication request on the replica.
+     *
+     * @param request Request to replication.
+     * @return Response.
+     */
+    public ReplicaResponse processRequest(ReplicaRequest request) { // define 
proper set of exceptions that might be thrown.
+        assert partitionId.equals(request.partitionId()) : 
IgniteStringFormatter.format(
+                "Request does not match to the replica [reqId={}, 
replicaId={}]", request.partitionId(), partitionId);

Review Comment:
   "Partition mismatch: request does not match the replica 
[request.partitionId={}, replica.partitionId={}]", request.partitionId(), 
partitionId



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import org.apache.ignite.internal.replicator.message.ReadOnlyMultyEntryRequest;
+import org.apache.ignite.internal.replicator.message.ReadOnlyRequest;
+import 
org.apache.ignite.internal.replicator.message.ReadOnlySingleEntryRequest;
+import 
org.apache.ignite.internal.replicator.message.ReadWriteMultiEntryRequest;
+import org.apache.ignite.internal.replicator.message.ReadWriteRequest;
+import 
org.apache.ignite.internal.replicator.message.ReadWriteSingleEntryRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import 
org.apache.ignite.internal.replicator.message.container.MultiEntryContainer;
+import 
org.apache.ignite.internal.replicator.message.container.SingleEntryContainer;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.TopologyService;
+
+/**
+ * Replica server.
+ * TODO:IGNITE-17257 Implement Replica server-side logic.
+ */
+public class Replica implements AutoCloseable {
+    /** Replicator network message factory. */
+    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
+
+    /** Replica group identity, this id is the same as the considered 
partition's id. */
+    private final String partitionId;
+
+    /** Local cluster node. */
+    private final ClusterNode localNode;
+
+    /** Replica listener. */
+    private final ReplicaListener listener;
+
+    /**
+     * The constructor of replica server.
+     *
+     * @param partitionId Identity.
+     * @param topologyService Topology service.
+     * @param listener Replica listener.
+     */
+    public Replica(
+            String partitionId,
+            TopologyService topologyService,
+            ReplicaListener listener
+    ) {
+        this.partitionId = partitionId;
+        this.localNode = topologyService.localMember();
+        this.listener = listener;
+    }
+
+    /**
+     * It is an internal method to applies a RW request to replicator.
+     *
+     * @param req Request to apply in replicator.
+     * @return Response.
+     */
+    private ReplicaResponse applyReadWriteCommandInternal(ReadWriteRequest 
req) {
+        //TODO: Check locks here.
+        if (req instanceof ReadWriteSingleEntryRequest) {
+            var request = (ReadWriteSingleEntryRequest) req;
+
+            var data = (SingleEntryContainer) 
listener.applyReadWrite(req.txId(), request.entryContainer(), req.opType());
+
+            return REPLICA_MESSAGES_FACTORY
+                    .singleEntryResponse()
+                    .partitionId(partitionId)
+                    .entryContainer(data)
+                    .build();
+        } else if (req instanceof ReadWriteMultiEntryRequest) {
+            var request = (ReadWriteMultiEntryRequest) req;
+
+            var data = (MultiEntryContainer) 
listener.applyReadWrite(req.txId(), request.entryContainer(), req.opType());
+
+            return REPLICA_MESSAGES_FACTORY
+                    .multiEntryResponse()
+                    .partitionId(partitionId)
+                    .entryContainer(data)
+                    .build();
+        } else {
+            throw new IgniteInternalException(
+                    IgniteStringFormatter.format("Unsupported request type 
[type={}]", req.getClass().getSimpleName()));
+        }
+    }
+
+    /**
+     * It is an internal method to applies a RO request to replicator.
+     *
+     * @param req Request to apply in replicator.
+     * @return Response.
+     */
+    private ReplicaResponse applyReadOnlyCommandInternal(ReadOnlyRequest req) {
+        //TODO: Check locks here.
+        if (req instanceof ReadOnlySingleEntryRequest) {
+            var request = (ReadOnlySingleEntryRequest) req;
+
+            var data = (SingleEntryContainer) 
listener.applyReadOnly(req.timestamp(), request.entryContainer(), req.opType());
+
+            return REPLICA_MESSAGES_FACTORY
+                    .singleEntryResponse()
+                    .partitionId(partitionId)
+                    .entryContainer(data)
+                    .build();
+        } else if (req instanceof ReadOnlyMultyEntryRequest) {
+            var request = (ReadOnlyMultyEntryRequest) req;
+
+            var data = (MultiEntryContainer) 
listener.applyReadOnly(req.timestamp(), request.entryContainer(), req.opType());
+
+            return REPLICA_MESSAGES_FACTORY
+                    .multiEntryResponse()
+                    .partitionId(partitionId)
+                    .entryContainer(data)
+                    .build();
+        } else {
+            throw new IgniteInternalException(
+                    IgniteStringFormatter.format("Unsupported request type 
[type={}]", req.getClass().getSimpleName()));
+        }
+    }
+
+    /**
+     * Process a replication request on the replica.
+     *
+     * @param request Request to replication.
+     * @return Response.
+     */
+    public ReplicaResponse processRequest(ReplicaRequest request) { // define 
proper set of exceptions that might be thrown.
+        assert partitionId.equals(request.partitionId()) : 
IgniteStringFormatter.format(
+                "Request does not match to the replica [reqId={}, 
replicaId={}]", request.partitionId(), partitionId);
+
+        //TODO: Check replica is alive.
+        if (request instanceof ReadWriteRequest) {

Review Comment:
   What's the point of splitting request into ReadWrite and Ro within Replica? 
I believe that in future we will add checkLease() step for RW requests, however 
currently it's all up to ReplicaListener to process any kind of request within 
single entry point.



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import org.apache.ignite.internal.replicator.message.ReadOnlyMultyEntryRequest;
+import org.apache.ignite.internal.replicator.message.ReadOnlyRequest;
+import 
org.apache.ignite.internal.replicator.message.ReadOnlySingleEntryRequest;
+import 
org.apache.ignite.internal.replicator.message.ReadWriteMultiEntryRequest;
+import org.apache.ignite.internal.replicator.message.ReadWriteRequest;
+import 
org.apache.ignite.internal.replicator.message.ReadWriteSingleEntryRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import 
org.apache.ignite.internal.replicator.message.container.MultiEntryContainer;
+import 
org.apache.ignite.internal.replicator.message.container.SingleEntryContainer;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.TopologyService;
+
+/**
+ * Replica server.
+ * TODO:IGNITE-17257 Implement Replica server-side logic.
+ */
+public class Replica implements AutoCloseable {
+    /** Replicator network message factory. */
+    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
+
+    /** Replica group identity, this id is the same as the considered 
partition's id. */
+    private final String partitionId;
+
+    /** Local cluster node. */
+    private final ClusterNode localNode;
+
+    /** Replica listener. */
+    private final ReplicaListener listener;
+
+    /**
+     * The constructor of replica server.
+     *
+     * @param partitionId Identity.
+     * @param topologyService Topology service.
+     * @param listener Replica listener.
+     */
+    public Replica(
+            String partitionId,
+            TopologyService topologyService,
+            ReplicaListener listener
+    ) {
+        this.partitionId = partitionId;
+        this.localNode = topologyService.localMember();
+        this.listener = listener;
+    }
+
+    /**
+     * It is an internal method to applies a RW request to replicator.
+     *
+     * @param req Request to apply in replicator.
+     * @return Response.
+     */
+    private ReplicaResponse applyReadWriteCommandInternal(ReadWriteRequest 
req) {
+        //TODO: Check locks here.
+        if (req instanceof ReadWriteSingleEntryRequest) {
+            var request = (ReadWriteSingleEntryRequest) req;
+
+            var data = (SingleEntryContainer) 
listener.applyReadWrite(req.txId(), request.entryContainer(), req.opType());
+
+            return REPLICA_MESSAGES_FACTORY
+                    .singleEntryResponse()
+                    .partitionId(partitionId)
+                    .entryContainer(data)
+                    .build();
+        } else if (req instanceof ReadWriteMultiEntryRequest) {
+            var request = (ReadWriteMultiEntryRequest) req;
+
+            var data = (MultiEntryContainer) 
listener.applyReadWrite(req.txId(), request.entryContainer(), req.opType());
+
+            return REPLICA_MESSAGES_FACTORY
+                    .multiEntryResponse()
+                    .partitionId(partitionId)
+                    .entryContainer(data)
+                    .build();
+        } else {
+            throw new IgniteInternalException(
+                    IgniteStringFormatter.format("Unsupported request type 
[type={}]", req.getClass().getSimpleName()));
+        }
+    }
+
+    /**
+     * It is an internal method to applies a RO request to replicator.
+     *
+     * @param req Request to apply in replicator.
+     * @return Response.
+     */
+    private ReplicaResponse applyReadOnlyCommandInternal(ReadOnlyRequest req) {
+        //TODO: Check locks here.
+        if (req instanceof ReadOnlySingleEntryRequest) {
+            var request = (ReadOnlySingleEntryRequest) req;
+
+            var data = (SingleEntryContainer) 
listener.applyReadOnly(req.timestamp(), request.entryContainer(), req.opType());
+
+            return REPLICA_MESSAGES_FACTORY
+                    .singleEntryResponse()
+                    .partitionId(partitionId)
+                    .entryContainer(data)
+                    .build();
+        } else if (req instanceof ReadOnlyMultyEntryRequest) {
+            var request = (ReadOnlyMultyEntryRequest) req;
+
+            var data = (MultiEntryContainer) 
listener.applyReadOnly(req.timestamp(), request.entryContainer(), req.opType());
+
+            return REPLICA_MESSAGES_FACTORY
+                    .multiEntryResponse()
+                    .partitionId(partitionId)
+                    .entryContainer(data)
+                    .build();
+        } else {
+            throw new IgniteInternalException(
+                    IgniteStringFormatter.format("Unsupported request type 
[type={}]", req.getClass().getSimpleName()));
+        }
+    }
+
+    /**
+     * Process a replication request on the replica.
+     *
+     * @param request Request to replication.
+     * @return Response.
+     */
+    public ReplicaResponse processRequest(ReplicaRequest request) { // define 
proper set of exceptions that might be thrown.
+        assert partitionId.equals(request.partitionId()) : 
IgniteStringFormatter.format(
+                "Request does not match to the replica [reqId={}, 
replicaId={}]", request.partitionId(), partitionId);
+
+        //TODO: Check replica is alive.
+        if (request instanceof ReadWriteRequest) {
+            return applyReadWriteCommandInternal((ReadWriteRequest) request);
+        }
+
+        return applyReadOnlyCommandInternal((ReadOnlyRequest) request);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void close() throws Exception {
+        //Close all resources.

Review Comment:
   What kind of resources do you expect to be closed here?



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import org.apache.ignite.internal.replicator.message.ReadOnlyMultyEntryRequest;
+import org.apache.ignite.internal.replicator.message.ReadOnlyRequest;
+import 
org.apache.ignite.internal.replicator.message.ReadOnlySingleEntryRequest;
+import 
org.apache.ignite.internal.replicator.message.ReadWriteMultiEntryRequest;
+import org.apache.ignite.internal.replicator.message.ReadWriteRequest;
+import 
org.apache.ignite.internal.replicator.message.ReadWriteSingleEntryRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import 
org.apache.ignite.internal.replicator.message.container.MultiEntryContainer;
+import 
org.apache.ignite.internal.replicator.message.container.SingleEntryContainer;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.TopologyService;
+
+/**
+ * Replica server.
+ * TODO:IGNITE-17257 Implement Replica server-side logic.
+ */
+public class Replica implements AutoCloseable {
+    /** Replicator network message factory. */
+    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
+
+    /** Replica group identity, this id is the same as the considered 
partition's id. */
+    private final String partitionId;
+
+    /** Local cluster node. */
+    private final ClusterNode localNode;
+
+    /** Replica listener. */
+    private final ReplicaListener listener;
+
+    /**
+     * The constructor of replica server.
+     *
+     * @param partitionId Identity.
+     * @param topologyService Topology service.
+     * @param listener Replica listener.
+     */
+    public Replica(
+            String partitionId,
+            TopologyService topologyService,
+            ReplicaListener listener
+    ) {
+        this.partitionId = partitionId;
+        this.localNode = topologyService.localMember();
+        this.listener = listener;
+    }
+
+    /**
+     * It is an internal method to applies a RW request to replicator.
+     *
+     * @param req Request to apply in replicator.
+     * @return Response.
+     */
+    private ReplicaResponse applyReadWriteCommandInternal(ReadWriteRequest 
req) {

Review Comment:
   I don't think that we need splitting for RW and RO neither in replica nor in 
Listener.



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaListener.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import java.util.UUID;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.message.OperationType;
+import 
org.apache.ignite.internal.replicator.message.container.SingleEntryContainer;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+
+/**
+ * Replica listener.
+ * TODO:IGNITE-17258 Implement ReplicaListener.
+ */
+public class ReplicaListener {
+    /** Versioned partition storage. */
+    private MvPartitionStorage mvDataStorage;
+
+    // /** Transaction state storage. */
+    // private TxnStorage txnStorage;
+
+    // /** Inmemory lock storage. */
+    // private VolatileLockStorage volatileLockStorage;
+
+    /** Raft client. */
+    private final RaftGroupService raftClient;
+
+    /**
+     * The listener constructor.
+     *
+     * @param mvDataStorage Partition storage.
+     * @param raftClient Raft client.
+     */
+    public ReplicaListener(MvPartitionStorage mvDataStorage, RaftGroupService 
raftClient) {
+        this.mvDataStorage = mvDataStorage;
+        this.raftClient = raftClient;
+    }
+
+    /**
+     * Applies an request for RW transaction.
+     *
+     * @param txId Transaction id.
+     * @param data      Data.
+     * @param opType    Operation type.
+     * @return Result.
+     */
+    public Object applyReadWrite(UUID txId, Object data, OperationType opType) 
{

Review Comment:
   As was mentioned in Replica, why do we need to split RW and RO handling?



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import org.apache.ignite.internal.replicator.message.ReadOnlyMultyEntryRequest;
+import org.apache.ignite.internal.replicator.message.ReadOnlyRequest;
+import 
org.apache.ignite.internal.replicator.message.ReadOnlySingleEntryRequest;
+import 
org.apache.ignite.internal.replicator.message.ReadWriteMultiEntryRequest;
+import org.apache.ignite.internal.replicator.message.ReadWriteRequest;
+import 
org.apache.ignite.internal.replicator.message.ReadWriteSingleEntryRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import 
org.apache.ignite.internal.replicator.message.container.MultiEntryContainer;
+import 
org.apache.ignite.internal.replicator.message.container.SingleEntryContainer;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.TopologyService;
+
+/**
+ * Replica server.
+ * TODO:IGNITE-17257 Implement Replica server-side logic.
+ */
+public class Replica implements AutoCloseable {
+    /** Replicator network message factory. */
+    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
+
+    /** Replica group identity, this id is the same as the considered 
partition's id. */
+    private final String partitionId;
+
+    /** Local cluster node. */
+    private final ClusterNode localNode;
+
+    /** Replica listener. */
+    private final ReplicaListener listener;
+
+    /**
+     * The constructor of replica server.
+     *
+     * @param partitionId Identity.
+     * @param topologyService Topology service.
+     * @param listener Replica listener.
+     */
+    public Replica(
+            String partitionId,
+            TopologyService topologyService,
+            ReplicaListener listener
+    ) {
+        this.partitionId = partitionId;
+        this.localNode = topologyService.localMember();
+        this.listener = listener;
+    }
+
+    /**
+     * It is an internal method to applies a RW request to replicator.
+     *
+     * @param req Request to apply in replicator.
+     * @return Response.
+     */
+    private ReplicaResponse applyReadWriteCommandInternal(ReadWriteRequest 
req) {
+        //TODO: Check locks here.
+        if (req instanceof ReadWriteSingleEntryRequest) {
+            var request = (ReadWriteSingleEntryRequest) req;
+
+            var data = (SingleEntryContainer) 
listener.applyReadWrite(req.txId(), request.entryContainer(), req.opType());
+
+            return REPLICA_MESSAGES_FACTORY
+                    .singleEntryResponse()
+                    .partitionId(partitionId)
+                    .entryContainer(data)
+                    .build();
+        } else if (req instanceof ReadWriteMultiEntryRequest) {
+            var request = (ReadWriteMultiEntryRequest) req;
+
+            var data = (MultiEntryContainer) 
listener.applyReadWrite(req.txId(), request.entryContainer(), req.opType());
+
+            return REPLICA_MESSAGES_FACTORY
+                    .multiEntryResponse()
+                    .partitionId(partitionId)
+                    .entryContainer(data)
+                    .build();
+        } else {
+            throw new IgniteInternalException(
+                    IgniteStringFormatter.format("Unsupported request type 
[type={}]", req.getClass().getSimpleName()));
+        }
+    }
+
+    /**
+     * It is an internal method to applies a RO request to replicator.
+     *
+     * @param req Request to apply in replicator.
+     * @return Response.
+     */
+    private ReplicaResponse applyReadOnlyCommandInternal(ReadOnlyRequest req) {
+        //TODO: Check locks here.
+        if (req instanceof ReadOnlySingleEntryRequest) {
+            var request = (ReadOnlySingleEntryRequest) req;
+
+            var data = (SingleEntryContainer) 
listener.applyReadOnly(req.timestamp(), request.entryContainer(), req.opType());
+
+            return REPLICA_MESSAGES_FACTORY
+                    .singleEntryResponse()
+                    .partitionId(partitionId)
+                    .entryContainer(data)
+                    .build();
+        } else if (req instanceof ReadOnlyMultyEntryRequest) {
+            var request = (ReadOnlyMultyEntryRequest) req;
+
+            var data = (MultiEntryContainer) 
listener.applyReadOnly(req.timestamp(), request.entryContainer(), req.opType());
+
+            return REPLICA_MESSAGES_FACTORY
+                    .multiEntryResponse()
+                    .partitionId(partitionId)
+                    .entryContainer(data)
+                    .build();
+        } else {
+            throw new IgniteInternalException(
+                    IgniteStringFormatter.format("Unsupported request type 
[type={}]", req.getClass().getSimpleName()));
+        }
+    }
+
+    /**
+     * Process a replication request on the replica.

Review Comment:
   As was previously mentioned not all ReplicaRequests are requests for the 
replication. 



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaListener.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import java.util.UUID;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.message.OperationType;
+import 
org.apache.ignite.internal.replicator.message.container.SingleEntryContainer;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+
+/**
+ * Replica listener.
+ * TODO:IGNITE-17258 Implement ReplicaListener.
+ */
+public class ReplicaListener {
+    /** Versioned partition storage. */
+    private MvPartitionStorage mvDataStorage;
+
+    // /** Transaction state storage. */
+    // private TxnStorage txnStorage;
+
+    // /** Inmemory lock storage. */
+    // private VolatileLockStorage volatileLockStorage;
+
+    /** Raft client. */
+    private final RaftGroupService raftClient;
+
+    /**
+     * The listener constructor.
+     *
+     * @param mvDataStorage Partition storage.
+     * @param raftClient Raft client.
+     */
+    public ReplicaListener(MvPartitionStorage mvDataStorage, RaftGroupService 
raftClient) {
+        this.mvDataStorage = mvDataStorage;
+        this.raftClient = raftClient;
+    }
+
+    /**
+     * Applies an request for RW transaction.
+     *
+     * @param txId Transaction id.
+     * @param data      Data.
+     * @param opType    Operation type.
+     * @return Result.
+     */
+    public Object applyReadWrite(UUID txId, Object data, OperationType opType) 
{

Review Comment:
   txId is not enough for requests processing, e.g. for writeIntent resolution. 
Why do we need opType? class bases type checking seems better to me.
   All-in-all instead of applyReadWrite and applyReadOnly let's have 
ReplicaRequest processRequest(ReplicaRequest request). We may also consider 
merging Replica and ReplicaListener if it's occurred that Replica doesn't have 
enough generic actions to process.



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkMessageHandler;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * The replica service.

Review Comment:
   It's not the replica service. 



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkMessageHandler;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * The replica service.
+ */
+public class ReplicaManager implements IgniteComponent {
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** Prevents double stopping the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    /** Cluster network service. */
+    private final ClusterService clusterNetSvc;
+
+    /** Partition storage. */
+    private final MvPartitionStorage mvDataStorage;
+
+    /** Replicas. */
+    private final ConcurrentHashMap<String, Replica> replicas = new 
ConcurrentHashMap<>();
+
+    /**
+     * Constructor for replica service.
+     *
+     * @param clusterNetSvc                 Cluster network service.
+     * @param mvDataStorage                 Partition storage.
+     */
+    public ReplicaManager(ClusterService clusterNetSvc, MvPartitionStorage 
mvDataStorage) {
+        this.clusterNetSvc = clusterNetSvc;
+        this.mvDataStorage = mvDataStorage;
+
+        
clusterNetSvc.messagingService().addMessageHandler(ReplicaMessageGroup.class, 
new NetworkMessageHandler() {
+            @Override
+            public void onReceived(NetworkMessage message, NetworkAddress 
senderAddr, @Nullable Long correlationId) {
+                if (!busyLock.enterBusy()) {
+                    throw new IgniteException(new NodeStoppingException());
+                }
+
+                try {
+                    assert message instanceof ReplicaRequest : 
IgniteStringFormatter.format("Unexpected message [message={}]", message);
+
+                    ReplicaRequest request = (ReplicaRequest) message;
+
+                    Replica replica = replicas.get(request.partitionId());
+
+                    if (replica == null) {
+                        //TODO:IGNITE-17255 Sent an exceptional response to 
the client side when the replica is absent.
+                    }
+
+                    ReplicaResponse resp = replica.processRequest(request);
+
+                    clusterNetSvc.messagingService().respond(senderAddr, resp, 
correlationId);
+                } finally {
+                    busyLock.leaveBusy();
+                }
+            }
+        });
+    }
+
+    /**
+     * Gets a replica by its partition id.
+     *
+     * @param partitionId Partition id.
+     * @return Instance of the replica or {@code null} if the replica is not 
started.
+     */
+    public Replica replica(String partitionId) {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteException(new NodeStoppingException());
+        }
+
+        try {
+            return replicas.get(partitionId);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Starts replica server.
+     *
+     * @param partId     Partition id.
+     * @param raftClient Raft client.
+     * @return Replica.
+     */
+    public Replica startReplica(String partId, RaftGroupService raftClient) {

Review Comment:
   Here and in other places: javadoc ins't specific, What will happen if 
there's already replica with given partitionId? Is that method thread-safe: 
what will happen if I'll try to create several listeners for given partId at 
the same time? We should parameterize given method with ReplicaListener instead 
of RaftGroupService.



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.TopologyService;
+
+/**
+ * The service is intended to execute requests on replicas.
+ * TODO:IGNITE-17255 Implement ReplicaService.
+ */
+public class ReplicaService implements AutoCloseable {
+    /** Network timeout. */
+    private static final int RPC_TIMEOUT = 3000;

Review Comment:
   Auf, well let's say that it'll be fixed in IGNITE-17255



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkMessageHandler;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * The replica service.
+ */
+public class ReplicaManager implements IgniteComponent {
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** Prevents double stopping the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    /** Cluster network service. */
+    private final ClusterService clusterNetSvc;
+
+    /** Partition storage. */
+    private final MvPartitionStorage mvDataStorage;
+
+    /** Replicas. */
+    private final ConcurrentHashMap<String, Replica> replicas = new 
ConcurrentHashMap<>();
+
+    /**
+     * Constructor for replica service.
+     *
+     * @param clusterNetSvc                 Cluster network service.
+     * @param mvDataStorage                 Partition storage.
+     */
+    public ReplicaManager(ClusterService clusterNetSvc, MvPartitionStorage 
mvDataStorage) {
+        this.clusterNetSvc = clusterNetSvc;
+        this.mvDataStorage = mvDataStorage;
+
+        
clusterNetSvc.messagingService().addMessageHandler(ReplicaMessageGroup.class, 
new NetworkMessageHandler() {
+            @Override
+            public void onReceived(NetworkMessage message, NetworkAddress 
senderAddr, @Nullable Long correlationId) {
+                if (!busyLock.enterBusy()) {
+                    throw new IgniteException(new NodeStoppingException());
+                }
+
+                try {
+                    assert message instanceof ReplicaRequest : 
IgniteStringFormatter.format("Unexpected message [message={}]", message);
+
+                    ReplicaRequest request = (ReplicaRequest) message;
+
+                    Replica replica = replicas.get(request.partitionId());
+
+                    if (replica == null) {
+                        //TODO:IGNITE-17255 Sent an exceptional response to 
the client side when the replica is absent.
+                    }
+
+                    ReplicaResponse resp = replica.processRequest(request);
+
+                    clusterNetSvc.messagingService().respond(senderAddr, resp, 
correlationId);
+                } finally {
+                    busyLock.leaveBusy();
+                }
+            }
+        });
+    }
+
+    /**
+     * Gets a replica by its partition id.
+     *
+     * @param partitionId Partition id.
+     * @return Instance of the replica or {@code null} if the replica is not 
started.
+     */
+    public Replica replica(String partitionId) {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteException(new NodeStoppingException());
+        }
+
+        try {
+            return replicas.get(partitionId);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Starts replica server.
+     *
+     * @param partId     Partition id.
+     * @param raftClient Raft client.
+     * @return Replica.
+     */
+    public Replica startReplica(String partId, RaftGroupService raftClient) {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteException(new NodeStoppingException());
+        }
+
+        try {
+            return getReplicaInternal(partId, raftClient);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Internal method for start a replica.
+     *
+     * @param partId     Partition id.
+     * @param raftClient Raft client.
+     * @return Replica.
+     */
+    private Replica getReplicaInternal(String partId, RaftGroupService 
raftClient) {
+        var replica = new Replica(partId, clusterNetSvc.topologyService(), new 
ReplicaListener(mvDataStorage, raftClient));
+
+        Replica previous = replicas.putIfAbsent(partId, replica);

Review Comment:
   I'd rather use computeIfAbsent here in order not to create (and thus not to 
close) the new replica if there's one with samw partId.



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import org.apache.ignite.internal.replicator.message.ReadOnlyMultyEntryRequest;
+import org.apache.ignite.internal.replicator.message.ReadOnlyRequest;
+import 
org.apache.ignite.internal.replicator.message.ReadOnlySingleEntryRequest;
+import 
org.apache.ignite.internal.replicator.message.ReadWriteMultiEntryRequest;
+import org.apache.ignite.internal.replicator.message.ReadWriteRequest;
+import 
org.apache.ignite.internal.replicator.message.ReadWriteSingleEntryRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import 
org.apache.ignite.internal.replicator.message.container.MultiEntryContainer;
+import 
org.apache.ignite.internal.replicator.message.container.SingleEntryContainer;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.TopologyService;
+
+/**
+ * Replica server.
+ * TODO:IGNITE-17257 Implement Replica server-side logic.
+ */
+public class Replica implements AutoCloseable {
+    /** Replicator network message factory. */
+    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
+
+    /** Replica group identity, this id is the same as the considered 
partition's id. */
+    private final String partitionId;
+
+    /** Local cluster node. */
+    private final ClusterNode localNode;
+
+    /** Replica listener. */
+    private final ReplicaListener listener;
+
+    /**
+     * The constructor of replica server.
+     *
+     * @param partitionId Identity.
+     * @param topologyService Topology service.
+     * @param listener Replica listener.
+     */
+    public Replica(
+            String partitionId,
+            TopologyService topologyService,
+            ReplicaListener listener
+    ) {
+        this.partitionId = partitionId;
+        this.localNode = topologyService.localMember();
+        this.listener = listener;
+    }
+
+    /**
+     * It is an internal method to applies a RW request to replicator.
+     *
+     * @param req Request to apply in replicator.
+     * @return Response.
+     */
+    private ReplicaResponse applyReadWriteCommandInternal(ReadWriteRequest 
req) {
+        //TODO: Check locks here.
+        if (req instanceof ReadWriteSingleEntryRequest) {
+            var request = (ReadWriteSingleEntryRequest) req;
+
+            var data = (SingleEntryContainer) 
listener.applyReadWrite(req.txId(), request.entryContainer(), req.opType());
+
+            return REPLICA_MESSAGES_FACTORY
+                    .singleEntryResponse()
+                    .partitionId(partitionId)
+                    .entryContainer(data)
+                    .build();
+        } else if (req instanceof ReadWriteMultiEntryRequest) {
+            var request = (ReadWriteMultiEntryRequest) req;
+
+            var data = (MultiEntryContainer) 
listener.applyReadWrite(req.txId(), request.entryContainer(), req.opType());
+
+            return REPLICA_MESSAGES_FACTORY
+                    .multiEntryResponse()
+                    .partitionId(partitionId)
+                    .entryContainer(data)
+                    .build();
+        } else {
+            throw new IgniteInternalException(
+                    IgniteStringFormatter.format("Unsupported request type 
[type={}]", req.getClass().getSimpleName()));
+        }
+    }
+
+    /**
+     * It is an internal method to applies a RO request to replicator.
+     *
+     * @param req Request to apply in replicator.
+     * @return Response.
+     */
+    private ReplicaResponse applyReadOnlyCommandInternal(ReadOnlyRequest req) {
+        //TODO: Check locks here.
+        if (req instanceof ReadOnlySingleEntryRequest) {
+            var request = (ReadOnlySingleEntryRequest) req;
+
+            var data = (SingleEntryContainer) 
listener.applyReadOnly(req.timestamp(), request.entryContainer(), req.opType());
+
+            return REPLICA_MESSAGES_FACTORY
+                    .singleEntryResponse()
+                    .partitionId(partitionId)
+                    .entryContainer(data)
+                    .build();
+        } else if (req instanceof ReadOnlyMultyEntryRequest) {
+            var request = (ReadOnlyMultyEntryRequest) req;
+
+            var data = (MultiEntryContainer) 
listener.applyReadOnly(req.timestamp(), request.entryContainer(), req.opType());
+
+            return REPLICA_MESSAGES_FACTORY
+                    .multiEntryResponse()
+                    .partitionId(partitionId)
+                    .entryContainer(data)
+                    .build();
+        } else {
+            throw new IgniteInternalException(
+                    IgniteStringFormatter.format("Unsupported request type 
[type={}]", req.getClass().getSimpleName()));
+        }
+    }
+
+    /**
+     * Process a replication request on the replica.
+     *
+     * @param request Request to replication.
+     * @return Response.
+     */
+    public ReplicaResponse processRequest(ReplicaRequest request) { // define 
proper set of exceptions that might be thrown.

Review Comment:
   Why it's not CompletableFuture<ReplicaResponse>? It's up to outer tx logic 
to whether await responses or not.



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.TopologyService;
+
+/**
+ * The service is intended to execute requests on replicas.
+ * TODO:IGNITE-17255 Implement ReplicaService.
+ */
+public class ReplicaService implements AutoCloseable {
+    /** Network timeout. */
+    private static final int RPC_TIMEOUT = 3000;
+
+    /** Replica manager. */
+    private final ReplicaManager replicaManager;
+
+    /** Message service. */
+    private final MessagingService messagingService;
+
+    /** Local node. */
+    private final ClusterNode localNode;
+
+    /**
+     * The constructor of replica client.
+     *
+     * @param replicaManager Replica manager.
+     * @param messagingService Cluster message service.
+     * @param topologyService Topology service.
+     */
+    public ReplicaService(
+            ReplicaManager replicaManager,
+            MessagingService messagingService,
+            TopologyService topologyService
+    ) {
+        this.replicaManager = replicaManager;
+        this.messagingService = messagingService;
+
+        this.localNode = topologyService.localMember();
+    }
+
+    /**
+     * Sends request to the replica node.
+     *
+     * @param node Cluster node which holds a replica.
+     * @param req  Replica request.
+     * @return Future to response.
+     */
+    private CompletableFuture<ReplicaResponse> sendToReplica(ClusterNode node, 
ReplicaRequest req) {
+        if (localNode.equals(node)) {
+            Replica replica = replicaManager.replica(req.partitionId());
+
+            if (replica == null) {
+                //TODO:IGNITE-17255 Provide an exceptional response when the 
replica is absent.
+            }
+
+            return 
CompletableFuture.completedFuture(replica.processRequest(req));
+        }
+
+        return messagingService.invoke(node.address(), req, 
RPC_TIMEOUT).thenApply(resp -> {
+            assert resp instanceof ReplicaResponse : 
IgniteStringFormatter.format("Unexpected message response [resp={}]", resp);
+
+            return (ReplicaResponse) resp;
+        });
+    }
+
+    /**
+     * Passes a request to replication.
+     *
+     * @param node Cluster node.

Review Comment:
   It's a recipient node.



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/MultiEntryResponse.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator.message;
+
+import 
org.apache.ignite.internal.replicator.message.container.MultiEntryContainer;
+import org.apache.ignite.network.annotations.Marshallable;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Replica response.
+ */
+@Transferable(ReplicaMessageGroup.REPLICA_MULTI_RESPONSE)
+public interface MultiEntryResponse extends ReplicaResponse {

Review Comment:
   I beleive that we should remove all ReplicaRequests and Responses for now, 
except ReplicaRequest and ReplicaResponse itself. We will bring them back 
during ReplicaListener Implementation.



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.TopologyService;
+
+/**
+ * The service is intended to execute requests on replicas.
+ * TODO:IGNITE-17255 Implement ReplicaService.
+ */
+public class ReplicaService implements AutoCloseable {
+    /** Network timeout. */
+    private static final int RPC_TIMEOUT = 3000;
+
+    /** Replica manager. */
+    private final ReplicaManager replicaManager;
+
+    /** Message service. */
+    private final MessagingService messagingService;
+
+    /** Local node. */
+    private final ClusterNode localNode;
+
+    /**
+     * The constructor of replica client.
+     *
+     * @param replicaManager Replica manager.
+     * @param messagingService Cluster message service.
+     * @param topologyService Topology service.
+     */
+    public ReplicaService(
+            ReplicaManager replicaManager,
+            MessagingService messagingService,
+            TopologyService topologyService
+    ) {
+        this.replicaManager = replicaManager;
+        this.messagingService = messagingService;
+
+        this.localNode = topologyService.localMember();
+    }
+
+    /**
+     * Sends request to the replica node.
+     *
+     * @param node Cluster node which holds a replica.
+     * @param req  Replica request.
+     * @return Future to response.
+     */
+    private CompletableFuture<ReplicaResponse> sendToReplica(ClusterNode node, 
ReplicaRequest req) {
+        if (localNode.equals(node)) {
+            Replica replica = replicaManager.replica(req.partitionId());
+
+            if (replica == null) {
+                //TODO:IGNITE-17255 Provide an exceptional response when the 
replica is absent.
+            }
+
+            return 
CompletableFuture.completedFuture(replica.processRequest(req));
+        }
+
+        return messagingService.invoke(node.address(), req, 
RPC_TIMEOUT).thenApply(resp -> {
+            assert resp instanceof ReplicaResponse : 
IgniteStringFormatter.format("Unexpected message response [resp={}]", resp);
+
+            return (ReplicaResponse) resp;
+        });
+    }
+
+    /**
+     * Passes a request to replication.
+     *
+     * @param node Cluster node.
+     * @param request Request.
+     * @return Future to response.
+     */
+    public CompletableFuture<ReplicaResponse> invoke(ClusterNode node, 
ReplicaRequest request) {

Review Comment:
   What will happen if RPC_TIMEOUT expires? Please declare all possible 
exceptions. I's fine that we don't know full set of exceptions for now, but 
let's declare ones that are throws already: TimeoutException(?), 
IgniteInternalException, NodeStoppingExcpetion(If it's possible) etc.



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator.message;
+
+import org.apache.ignite.network.annotations.MessageGroup;
+
+/**
+ * Message group for the replication process.
+ */
+@MessageGroup(groupType = 8, groupName = "ReplicaMessages")
+public class ReplicaMessageGroup {

Review Comment:
   After finalizing set of replica request and responses let's adjust given 
class.



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import org.apache.ignite.internal.replicator.message.ReadOnlyMultyEntryRequest;
+import org.apache.ignite.internal.replicator.message.ReadOnlyRequest;
+import 
org.apache.ignite.internal.replicator.message.ReadOnlySingleEntryRequest;
+import 
org.apache.ignite.internal.replicator.message.ReadWriteMultiEntryRequest;
+import org.apache.ignite.internal.replicator.message.ReadWriteRequest;
+import 
org.apache.ignite.internal.replicator.message.ReadWriteSingleEntryRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import 
org.apache.ignite.internal.replicator.message.container.MultiEntryContainer;
+import 
org.apache.ignite.internal.replicator.message.container.SingleEntryContainer;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.TopologyService;
+
+/**
+ * Replica server.
+ * TODO:IGNITE-17257 Implement Replica server-side logic.
+ */
+public class Replica implements AutoCloseable {
+    /** Replicator network message factory. */
+    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
+
+    /** Replica group identity, this id is the same as the considered 
partition's id. */
+    private final String partitionId;
+
+    /** Local cluster node. */
+    private final ClusterNode localNode;
+
+    /** Replica listener. */
+    private final ReplicaListener listener;
+
+    /**
+     * The constructor of replica server.
+     *
+     * @param partitionId Identity.
+     * @param topologyService Topology service.
+     * @param listener Replica listener.
+     */
+    public Replica(
+            String partitionId,
+            TopologyService topologyService,
+            ReplicaListener listener
+    ) {
+        this.partitionId = partitionId;
+        this.localNode = topologyService.localMember();
+        this.listener = listener;
+    }
+
+    /**
+     * It is an internal method to applies a RW request to replicator.
+     *
+     * @param req Request to apply in replicator.
+     * @return Response.
+     */
+    private ReplicaResponse applyReadWriteCommandInternal(ReadWriteRequest 
req) {
+        //TODO: Check locks here.
+        if (req instanceof ReadWriteSingleEntryRequest) {
+            var request = (ReadWriteSingleEntryRequest) req;
+
+            var data = (SingleEntryContainer) 
listener.applyReadWrite(req.txId(), request.entryContainer(), req.opType());
+
+            return REPLICA_MESSAGES_FACTORY
+                    .singleEntryResponse()
+                    .partitionId(partitionId)
+                    .entryContainer(data)
+                    .build();
+        } else if (req instanceof ReadWriteMultiEntryRequest) {
+            var request = (ReadWriteMultiEntryRequest) req;
+
+            var data = (MultiEntryContainer) 
listener.applyReadWrite(req.txId(), request.entryContainer(), req.opType());
+
+            return REPLICA_MESSAGES_FACTORY
+                    .multiEntryResponse()

Review Comment:
   Why it's expected that multiEntryRequest will always produce 
multiEntryResponse? E.g. I believe that sort of remove([]keys) and similar may 
have void response.



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import org.apache.ignite.internal.replicator.message.ReadOnlyMultyEntryRequest;
+import org.apache.ignite.internal.replicator.message.ReadOnlyRequest;
+import 
org.apache.ignite.internal.replicator.message.ReadOnlySingleEntryRequest;
+import 
org.apache.ignite.internal.replicator.message.ReadWriteMultiEntryRequest;
+import org.apache.ignite.internal.replicator.message.ReadWriteRequest;
+import 
org.apache.ignite.internal.replicator.message.ReadWriteSingleEntryRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import 
org.apache.ignite.internal.replicator.message.container.MultiEntryContainer;
+import 
org.apache.ignite.internal.replicator.message.container.SingleEntryContainer;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.TopologyService;
+
+/**
+ * Replica server.
+ * TODO:IGNITE-17257 Implement Replica server-side logic.
+ */
+public class Replica implements AutoCloseable {
+    /** Replicator network message factory. */
+    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
+
+    /** Replica group identity, this id is the same as the considered 
partition's id. */
+    private final String partitionId;
+
+    /** Local cluster node. */
+    private final ClusterNode localNode;
+
+    /** Replica listener. */
+    private final ReplicaListener listener;
+
+    /**
+     * The constructor of replica server.
+     *
+     * @param partitionId Identity.
+     * @param topologyService Topology service.
+     * @param listener Replica listener.
+     */
+    public Replica(
+            String partitionId,
+            TopologyService topologyService,
+            ReplicaListener listener
+    ) {
+        this.partitionId = partitionId;
+        this.localNode = topologyService.localMember();
+        this.listener = listener;
+    }
+
+    /**
+     * It is an internal method to applies a RW request to replicator.
+     *
+     * @param req Request to apply in replicator.
+     * @return Response.
+     */
+    private ReplicaResponse applyReadWriteCommandInternal(ReadWriteRequest 
req) {
+        //TODO: Check locks here.
+        if (req instanceof ReadWriteSingleEntryRequest) {

Review Comment:
   Neither do I think that we need splitting for single/multi entry requests. 
What's the point?



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import org.apache.ignite.internal.replicator.message.ReadOnlyMultyEntryRequest;
+import org.apache.ignite.internal.replicator.message.ReadOnlyRequest;
+import 
org.apache.ignite.internal.replicator.message.ReadOnlySingleEntryRequest;
+import 
org.apache.ignite.internal.replicator.message.ReadWriteMultiEntryRequest;
+import org.apache.ignite.internal.replicator.message.ReadWriteRequest;
+import 
org.apache.ignite.internal.replicator.message.ReadWriteSingleEntryRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import 
org.apache.ignite.internal.replicator.message.container.MultiEntryContainer;
+import 
org.apache.ignite.internal.replicator.message.container.SingleEntryContainer;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.TopologyService;
+
+/**
+ * Replica server.
+ * TODO:IGNITE-17257 Implement Replica server-side logic.
+ */
+public class Replica implements AutoCloseable {
+    /** Replicator network message factory. */
+    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
+
+    /** Replica group identity, this id is the same as the considered 
partition's id. */
+    private final String partitionId;
+
+    /** Local cluster node. */
+    private final ClusterNode localNode;
+
+    /** Replica listener. */
+    private final ReplicaListener listener;
+
+    /**
+     * The constructor of replica server.
+     *
+     * @param partitionId Identity.
+     * @param topologyService Topology service.
+     * @param listener Replica listener.
+     */
+    public Replica(
+            String partitionId,
+            TopologyService topologyService,
+            ReplicaListener listener
+    ) {
+        this.partitionId = partitionId;
+        this.localNode = topologyService.localMember();
+        this.listener = listener;
+    }
+
+    /**
+     * It is an internal method to applies a RW request to replicator.
+     *
+     * @param req Request to apply in replicator.
+     * @return Response.
+     */
+    private ReplicaResponse applyReadWriteCommandInternal(ReadWriteRequest 
req) {
+        //TODO: Check locks here.
+        if (req instanceof ReadWriteSingleEntryRequest) {
+            var request = (ReadWriteSingleEntryRequest) req;
+
+            var data = (SingleEntryContainer) 
listener.applyReadWrite(req.txId(), request.entryContainer(), req.opType());
+
+            return REPLICA_MESSAGES_FACTORY
+                    .singleEntryResponse()
+                    .partitionId(partitionId)
+                    .entryContainer(data)
+                    .build();
+        } else if (req instanceof ReadWriteMultiEntryRequest) {
+            var request = (ReadWriteMultiEntryRequest) req;
+
+            var data = (MultiEntryContainer) 
listener.applyReadWrite(req.txId(), request.entryContainer(), req.opType());
+
+            return REPLICA_MESSAGES_FACTORY
+                    .multiEntryResponse()
+                    .partitionId(partitionId)

Review Comment:
   Do we really need to return paritionId backwards?



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicatorUtils.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * This is an utility class for serialization cache tuples. It will be removed 
after another way for serialization is implemented into the
+ * network layer.
+ * TODO: Remove it after (IGNITE-14793)
+ */
+public class ReplicatorUtils {

Review Comment:
   Is it possible to reuse ByteUtils or similar instead of adding new one? If 
there some methods are missing in current vetrison of ByteUtils we might create 
them.



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import org.apache.ignite.internal.replicator.message.ReadOnlyMultyEntryRequest;
+import org.apache.ignite.internal.replicator.message.ReadOnlyRequest;
+import 
org.apache.ignite.internal.replicator.message.ReadOnlySingleEntryRequest;
+import 
org.apache.ignite.internal.replicator.message.ReadWriteMultiEntryRequest;
+import org.apache.ignite.internal.replicator.message.ReadWriteRequest;
+import 
org.apache.ignite.internal.replicator.message.ReadWriteSingleEntryRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import 
org.apache.ignite.internal.replicator.message.container.MultiEntryContainer;
+import 
org.apache.ignite.internal.replicator.message.container.SingleEntryContainer;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.TopologyService;
+
+/**
+ * Replica server.
+ * TODO:IGNITE-17257 Implement Replica server-side logic.
+ */
+public class Replica implements AutoCloseable {
+    /** Replicator network message factory. */
+    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
+
+    /** Replica group identity, this id is the same as the considered 
partition's id. */
+    private final String partitionId;
+
+    /** Local cluster node. */
+    private final ClusterNode localNode;
+
+    /** Replica listener. */
+    private final ReplicaListener listener;
+
+    /**
+     * The constructor of replica server.
+     *
+     * @param partitionId Identity.
+     * @param topologyService Topology service.
+     * @param listener Replica listener.
+     */
+    public Replica(
+            String partitionId,
+            TopologyService topologyService,
+            ReplicaListener listener
+    ) {
+        this.partitionId = partitionId;
+        this.localNode = topologyService.localMember();
+        this.listener = listener;
+    }
+
+    /**
+     * It is an internal method to applies a RW request to replicator.
+     *
+     * @param req Request to apply in replicator.
+     * @return Response.
+     */
+    private ReplicaResponse applyReadWriteCommandInternal(ReadWriteRequest 
req) {
+        //TODO: Check locks here.
+        if (req instanceof ReadWriteSingleEntryRequest) {
+            var request = (ReadWriteSingleEntryRequest) req;
+
+            var data = (SingleEntryContainer) 
listener.applyReadWrite(req.txId(), request.entryContainer(), req.opType());
+
+            return REPLICA_MESSAGES_FACTORY
+                    .singleEntryResponse()
+                    .partitionId(partitionId)
+                    .entryContainer(data)
+                    .build();
+        } else if (req instanceof ReadWriteMultiEntryRequest) {
+            var request = (ReadWriteMultiEntryRequest) req;
+
+            var data = (MultiEntryContainer) 
listener.applyReadWrite(req.txId(), request.entryContainer(), req.opType());
+
+            return REPLICA_MESSAGES_FACTORY
+                    .multiEntryResponse()
+                    .partitionId(partitionId)
+                    .entryContainer(data)
+                    .build();
+        } else {
+            throw new IgniteInternalException(

Review Comment:
   As was mentioned above, I believe it's up to Listener to check whether 
request has known type and throw Unsupported exception otherwise.



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaListener.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import java.util.UUID;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.message.OperationType;
+import 
org.apache.ignite.internal.replicator.message.container.SingleEntryContainer;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+
+/**
+ * Replica listener.
+ * TODO:IGNITE-17258 Implement ReplicaListener.
+ */
+public class ReplicaListener {
+    /** Versioned partition storage. */
+    private MvPartitionStorage mvDataStorage;
+
+    // /** Transaction state storage. */
+    // private TxnStorage txnStorage;
+
+    // /** Inmemory lock storage. */
+    // private VolatileLockStorage volatileLockStorage;
+
+    /** Raft client. */
+    private final RaftGroupService raftClient;
+
+    /**
+     * The listener constructor.
+     *
+     * @param mvDataStorage Partition storage.
+     * @param raftClient Raft client.
+     */
+    public ReplicaListener(MvPartitionStorage mvDataStorage, RaftGroupService 
raftClient) {
+        this.mvDataStorage = mvDataStorage;
+        this.raftClient = raftClient;
+    }
+
+    /**
+     * Applies an request for RW transaction.
+     *
+     * @param txId Transaction id.
+     * @param data      Data.
+     * @param opType    Operation type.
+     * @return Result.
+     */
+    public Object applyReadWrite(UUID txId, Object data, OperationType opType) 
{

Review Comment:
   Not sure whether it's a good idea to return object. BTW, what about 
Serializable?



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkMessageHandler;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * The replica service.
+ */
+public class ReplicaManager implements IgniteComponent {
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** Prevents double stopping the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    /** Cluster network service. */
+    private final ClusterService clusterNetSvc;
+
+    /** Partition storage. */
+    private final MvPartitionStorage mvDataStorage;
+
+    /** Replicas. */
+    private final ConcurrentHashMap<String, Replica> replicas = new 
ConcurrentHashMap<>();
+
+    /**
+     * Constructor for replica service.
+     *
+     * @param clusterNetSvc                 Cluster network service.
+     * @param mvDataStorage                 Partition storage.
+     */
+    public ReplicaManager(ClusterService clusterNetSvc, MvPartitionStorage 
mvDataStorage) {
+        this.clusterNetSvc = clusterNetSvc;
+        this.mvDataStorage = mvDataStorage;
+
+        
clusterNetSvc.messagingService().addMessageHandler(ReplicaMessageGroup.class, 
new NetworkMessageHandler() {
+            @Override
+            public void onReceived(NetworkMessage message, NetworkAddress 
senderAddr, @Nullable Long correlationId) {
+                if (!busyLock.enterBusy()) {
+                    throw new IgniteException(new NodeStoppingException());
+                }
+
+                try {
+                    assert message instanceof ReplicaRequest : 
IgniteStringFormatter.format("Unexpected message [message={}]", message);
+
+                    ReplicaRequest request = (ReplicaRequest) message;
+
+                    Replica replica = replicas.get(request.partitionId());
+
+                    if (replica == null) {
+                        //TODO:IGNITE-17255 Sent an exceptional response to 
the client side when the replica is absent.
+                    }
+
+                    ReplicaResponse resp = replica.processRequest(request);

Review Comment:
   What about exception handling?



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkMessageHandler;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * The replica service.
+ */
+public class ReplicaManager implements IgniteComponent {
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** Prevents double stopping the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    /** Cluster network service. */
+    private final ClusterService clusterNetSvc;
+
+    /** Partition storage. */
+    private final MvPartitionStorage mvDataStorage;
+
+    /** Replicas. */
+    private final ConcurrentHashMap<String, Replica> replicas = new 
ConcurrentHashMap<>();
+
+    /**
+     * Constructor for replica service.
+     *
+     * @param clusterNetSvc                 Cluster network service.
+     * @param mvDataStorage                 Partition storage.
+     */
+    public ReplicaManager(ClusterService clusterNetSvc, MvPartitionStorage 
mvDataStorage) {

Review Comment:
   That won't work, partitions might have different mvDataStorages and not only 
mvDataStorages actually. E.g. table1.partitionX will have RocksDbStorage1, 
table2.partitionY will have RocksDbStorage2, table3.parititionZ will have 
In-memory MVPartitionStorage, table4.parititionK will have PageMemory 
MVPartitionStorage etc.
   All-in-all storages will parameterize replicaListeners that will 
parameterize startReplica() method.



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkMessageHandler;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * The replica service.
+ */
+public class ReplicaManager implements IgniteComponent {
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** Prevents double stopping the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    /** Cluster network service. */
+    private final ClusterService clusterNetSvc;
+
+    /** Partition storage. */
+    private final MvPartitionStorage mvDataStorage;
+
+    /** Replicas. */
+    private final ConcurrentHashMap<String, Replica> replicas = new 
ConcurrentHashMap<>();
+
+    /**
+     * Constructor for replica service.
+     *
+     * @param clusterNetSvc                 Cluster network service.
+     * @param mvDataStorage                 Partition storage.
+     */
+    public ReplicaManager(ClusterService clusterNetSvc, MvPartitionStorage 
mvDataStorage) {
+        this.clusterNetSvc = clusterNetSvc;
+        this.mvDataStorage = mvDataStorage;
+
+        
clusterNetSvc.messagingService().addMessageHandler(ReplicaMessageGroup.class, 
new NetworkMessageHandler() {
+            @Override
+            public void onReceived(NetworkMessage message, NetworkAddress 
senderAddr, @Nullable Long correlationId) {
+                if (!busyLock.enterBusy()) {
+                    throw new IgniteException(new NodeStoppingException());
+                }
+
+                try {
+                    assert message instanceof ReplicaRequest : 
IgniteStringFormatter.format("Unexpected message [message={}]", message);
+
+                    ReplicaRequest request = (ReplicaRequest) message;
+
+                    Replica replica = replicas.get(request.partitionId());
+
+                    if (replica == null) {
+                        //TODO:IGNITE-17255 Sent an exceptional response to 
the client side when the replica is absent.
+                    }
+
+                    ReplicaResponse resp = replica.processRequest(request);
+
+                    clusterNetSvc.messagingService().respond(senderAddr, resp, 
correlationId);
+                } finally {
+                    busyLock.leaveBusy();
+                }
+            }
+        });
+    }
+
+    /**
+     * Gets a replica by its partition id.
+     *
+     * @param partitionId Partition id.
+     * @return Instance of the replica or {@code null} if the replica is not 
started.
+     */
+    public Replica replica(String partitionId) {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteException(new NodeStoppingException());
+        }
+
+        try {
+            return replicas.get(partitionId);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Starts replica server.
+     *
+     * @param partId     Partition id.
+     * @param raftClient Raft client.
+     * @return Replica.
+     */
+    public Replica startReplica(String partId, RaftGroupService raftClient) {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteException(new NodeStoppingException());
+        }
+
+        try {
+            return getReplicaInternal(partId, raftClient);

Review Comment:
   Why it's **get**ReplicaInternal, it seems that startReplicaInternal suites 
better here.
   Btw why do we need Internal counterparts at all? 



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkMessageHandler;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * The replica service.
+ */
+public class ReplicaManager implements IgniteComponent {
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** Prevents double stopping the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    /** Cluster network service. */
+    private final ClusterService clusterNetSvc;
+
+    /** Partition storage. */
+    private final MvPartitionStorage mvDataStorage;
+
+    /** Replicas. */
+    private final ConcurrentHashMap<String, Replica> replicas = new 
ConcurrentHashMap<>();
+
+    /**
+     * Constructor for replica service.
+     *
+     * @param clusterNetSvc                 Cluster network service.
+     * @param mvDataStorage                 Partition storage.
+     */
+    public ReplicaManager(ClusterService clusterNetSvc, MvPartitionStorage 
mvDataStorage) {
+        this.clusterNetSvc = clusterNetSvc;
+        this.mvDataStorage = mvDataStorage;
+
+        
clusterNetSvc.messagingService().addMessageHandler(ReplicaMessageGroup.class, 
new NetworkMessageHandler() {
+            @Override
+            public void onReceived(NetworkMessage message, NetworkAddress 
senderAddr, @Nullable Long correlationId) {
+                if (!busyLock.enterBusy()) {
+                    throw new IgniteException(new NodeStoppingException());
+                }
+
+                try {
+                    assert message instanceof ReplicaRequest : 
IgniteStringFormatter.format("Unexpected message [message={}]", message);
+
+                    ReplicaRequest request = (ReplicaRequest) message;
+
+                    Replica replica = replicas.get(request.partitionId());
+
+                    if (replica == null) {
+                        //TODO:IGNITE-17255 Sent an exceptional response to 
the client side when the replica is absent.
+                    }
+
+                    ReplicaResponse resp = replica.processRequest(request);
+
+                    clusterNetSvc.messagingService().respond(senderAddr, resp, 
correlationId);
+                } finally {
+                    busyLock.leaveBusy();
+                }
+            }
+        });
+    }
+
+    /**
+     * Gets a replica by its partition id.
+     *
+     * @param partitionId Partition id.
+     * @return Instance of the replica or {@code null} if the replica is not 
started.
+     */
+    public Replica replica(String partitionId) {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteException(new NodeStoppingException());
+        }
+
+        try {
+            return replicas.get(partitionId);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Starts replica server.
+     *
+     * @param partId     Partition id.
+     * @param raftClient Raft client.
+     * @return Replica.
+     */
+    public Replica startReplica(String partId, RaftGroupService raftClient) {

Review Comment:
   Javadoc ins't specific, What will happen if there's already replica with 
given partitionId? Is that method thread-safe: what will happen if I'll try to 
create several listeners for given partId at the same time? We should 
parameterize given method with ReplicaListener instead of RaftGroupService.



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkMessageHandler;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * The replica service.
+ */
+public class ReplicaManager implements IgniteComponent {
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** Prevents double stopping the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    /** Cluster network service. */
+    private final ClusterService clusterNetSvc;
+
+    /** Partition storage. */
+    private final MvPartitionStorage mvDataStorage;
+
+    /** Replicas. */
+    private final ConcurrentHashMap<String, Replica> replicas = new 
ConcurrentHashMap<>();
+
+    /**
+     * Constructor for replica service.
+     *
+     * @param clusterNetSvc                 Cluster network service.
+     * @param mvDataStorage                 Partition storage.
+     */
+    public ReplicaManager(ClusterService clusterNetSvc, MvPartitionStorage 
mvDataStorage) {
+        this.clusterNetSvc = clusterNetSvc;
+        this.mvDataStorage = mvDataStorage;
+
+        
clusterNetSvc.messagingService().addMessageHandler(ReplicaMessageGroup.class, 
new NetworkMessageHandler() {

Review Comment:
   That should goes inside start method instead of constructor.



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.TopologyService;
+
+/**
+ * The service is intended to execute requests on replicas.
+ * TODO:IGNITE-17255 Implement ReplicaService.
+ */
+public class ReplicaService implements AutoCloseable {
+    /** Network timeout. */
+    private static final int RPC_TIMEOUT = 3000;
+
+    /** Replica manager. */
+    private final ReplicaManager replicaManager;
+
+    /** Message service. */
+    private final MessagingService messagingService;
+
+    /** Local node. */
+    private final ClusterNode localNode;
+
+    /**
+     * The constructor of replica client.
+     *
+     * @param replicaManager Replica manager.
+     * @param messagingService Cluster message service.
+     * @param topologyService Topology service.
+     */
+    public ReplicaService(
+            ReplicaManager replicaManager,
+            MessagingService messagingService,
+            TopologyService topologyService
+    ) {
+        this.replicaManager = replicaManager;

Review Comment:
   Despite the fact that I've earlier told that Manager seems fine here, I'd 
favor using replica list producer here. Otherwise it'll look like that it's 
fine to start and stop replicas from within client. 



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.TopologyService;
+
+/**
+ * The service is intended to execute requests on replicas.
+ * TODO:IGNITE-17255 Implement ReplicaService.
+ */
+public class ReplicaService implements AutoCloseable {
+    /** Network timeout. */
+    private static final int RPC_TIMEOUT = 3000;
+
+    /** Replica manager. */
+    private final ReplicaManager replicaManager;
+
+    /** Message service. */
+    private final MessagingService messagingService;
+
+    /** Local node. */
+    private final ClusterNode localNode;
+
+    /**
+     * The constructor of replica client.
+     *
+     * @param replicaManager Replica manager.
+     * @param messagingService Cluster message service.
+     * @param topologyService Topology service.
+     */
+    public ReplicaService(
+            ReplicaManager replicaManager,
+            MessagingService messagingService,
+            TopologyService topologyService
+    ) {
+        this.replicaManager = replicaManager;
+        this.messagingService = messagingService;
+
+        this.localNode = topologyService.localMember();
+    }
+
+    /**
+     * Sends request to the replica node.
+     *
+     * @param node Cluster node which holds a replica.
+     * @param req  Replica request.
+     * @return Future to response.
+     */
+    private CompletableFuture<ReplicaResponse> sendToReplica(ClusterNode node, 
ReplicaRequest req) {
+        if (localNode.equals(node)) {
+            Replica replica = replicaManager.replica(req.partitionId());
+
+            if (replica == null) {
+                //TODO:IGNITE-17255 Provide an exceptional response when the 
replica is absent.
+            }
+
+            return 
CompletableFuture.completedFuture(replica.processRequest(req));
+        }
+
+        return messagingService.invoke(node.address(), req, 
RPC_TIMEOUT).thenApply(resp -> {
+            assert resp instanceof ReplicaResponse : 
IgniteStringFormatter.format("Unexpected message response [resp={}]", resp);
+
+            return (ReplicaResponse) resp;
+        });
+    }
+
+    /**
+     * Passes a request to replication.
+     *
+     * @param node Cluster node.
+     * @param request Request.
+     * @return Future to response.
+     */
+    public CompletableFuture<ReplicaResponse> invoke(ClusterNode node, 
ReplicaRequest request) {
+        //TODO: Check replica is alive.
+        return sendToReplica(node, request);
+    }
+
+    /**
+     * Passes a request to replication.
+     *
+     * @param node Cluster node.
+     * @param request Request.
+     * @param storageId Storage id.
+     * @return Future to response.
+     */
+    public CompletableFuture<ReplicaResponse> invoke(ClusterNode node, 
ReplicaRequest request, String storageId) {
+        //TODO: Check replica is alive.
+        return sendToReplica(node, request);
+
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void close() throws Exception {
+        //Free the resources.

Review Comment:
   What kind of resources do you expect to close here?



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.TopologyService;
+
+/**
+ * The service is intended to execute requests on replicas.
+ * TODO:IGNITE-17255 Implement ReplicaService.
+ */
+public class ReplicaService implements AutoCloseable {
+    /** Network timeout. */
+    private static final int RPC_TIMEOUT = 3000;
+
+    /** Replica manager. */
+    private final ReplicaManager replicaManager;
+
+    /** Message service. */
+    private final MessagingService messagingService;
+
+    /** Local node. */
+    private final ClusterNode localNode;
+
+    /**
+     * The constructor of replica client.
+     *
+     * @param replicaManager Replica manager.
+     * @param messagingService Cluster message service.
+     * @param topologyService Topology service.
+     */
+    public ReplicaService(
+            ReplicaManager replicaManager,
+            MessagingService messagingService,
+            TopologyService topologyService
+    ) {
+        this.replicaManager = replicaManager;
+        this.messagingService = messagingService;
+
+        this.localNode = topologyService.localMember();
+    }
+
+    /**
+     * Sends request to the replica node.
+     *
+     * @param node Cluster node which holds a replica.
+     * @param req  Replica request.
+     * @return Future to response.
+     */
+    private CompletableFuture<ReplicaResponse> sendToReplica(ClusterNode node, 
ReplicaRequest req) {
+        if (localNode.equals(node)) {
+            Replica replica = replicaManager.replica(req.partitionId());
+
+            if (replica == null) {
+                //TODO:IGNITE-17255 Provide an exceptional response when the 
replica is absent.
+            }
+
+            return 
CompletableFuture.completedFuture(replica.processRequest(req));

Review Comment:
   competedFuture, wht? I believe we may just propagate replica.ProcessRequests 
future.



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.TopologyService;
+
+/**
+ * The service is intended to execute requests on replicas.
+ * TODO:IGNITE-17255 Implement ReplicaService.
+ */
+public class ReplicaService implements AutoCloseable {
+    /** Network timeout. */
+    private static final int RPC_TIMEOUT = 3000;
+
+    /** Replica manager. */
+    private final ReplicaManager replicaManager;
+
+    /** Message service. */
+    private final MessagingService messagingService;
+
+    /** Local node. */
+    private final ClusterNode localNode;
+
+    /**
+     * The constructor of replica client.
+     *
+     * @param replicaManager Replica manager.
+     * @param messagingService Cluster message service.
+     * @param topologyService Topology service.
+     */
+    public ReplicaService(
+            ReplicaManager replicaManager,
+            MessagingService messagingService,
+            TopologyService topologyService
+    ) {
+        this.replicaManager = replicaManager;
+        this.messagingService = messagingService;
+
+        this.localNode = topologyService.localMember();
+    }
+
+    /**
+     * Sends request to the replica node.
+     *
+     * @param node Cluster node which holds a replica.
+     * @param req  Replica request.
+     * @return Future to response.
+     */
+    private CompletableFuture<ReplicaResponse> sendToReplica(ClusterNode node, 
ReplicaRequest req) {
+        if (localNode.equals(node)) {
+            Replica replica = replicaManager.replica(req.partitionId());
+
+            if (replica == null) {
+                //TODO:IGNITE-17255 Provide an exceptional response when the 
replica is absent.
+            }
+
+            return 
CompletableFuture.completedFuture(replica.processRequest(req));
+        }
+
+        return messagingService.invoke(node.address(), req, 
RPC_TIMEOUT).thenApply(resp -> {
+            assert resp instanceof ReplicaResponse : 
IgniteStringFormatter.format("Unexpected message response [resp={}]", resp);
+
+            return (ReplicaResponse) resp;
+        });
+    }
+
+    /**
+     * Passes a request to replication.
+     *
+     * @param node Cluster node.
+     * @param request Request.
+     * @return Future to response.
+     */
+    public CompletableFuture<ReplicaResponse> invoke(ClusterNode node, 
ReplicaRequest request) {
+        //TODO: Check replica is alive.

Review Comment:
   Please add corresponding ticket.



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/OperationType.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator.message;
+
+import java.io.Serializable;
+
+/**
+ * Transaction operation type.
+ */
+public enum OperationType implements Serializable {

Review Comment:
   I don't think that we need given enum, please check my comment above.



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaResponse.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator.message;
+
+import org.apache.ignite.network.NetworkMessage;
+
+/**
+ * Replica response interface.
+ */
+public interface ReplicaResponse extends NetworkMessage {
+    /**
+     * Gets a partition id.
+     *
+     * @return Partition id.
+     */
+    String partitionId();

Review Comment:
   Do we really need replicationGroupId in response? Why?



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaRequest.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator.message;
+
+import org.apache.ignite.network.NetworkMessage;
+
+/**
+ * Replica request.
+ */
+public interface ReplicaRequest extends NetworkMessage {
+    /**
+     * Gets a partition id.
+     *
+     * @return Partition id.
+     */
+    String partitionId();

Review Comment:
   MetaStorage ReplicaRequests won't have paritionId(), it's rather 
replicationGroupId(). My fault(( Sorry for that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to