alievmirza commented on code in PR #1612:
URL: https://github.com/apache/ignite-3/pull/1612#discussion_r1094106943


##########
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java:
##########
@@ -44,193 +47,305 @@
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.command.HybridTimestampMessage;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.schema.BinaryConverter;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.schema.TableRowConverter;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
-import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
+import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
+import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
+import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
+import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
+import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
-import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
 import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
+import 
org.apache.ignite.internal.tx.storage.state.test.TestTxStateTableStorage;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
  * Persistent partitions raft group snapshots tests.
  */
-@Disabled("IGNITE-16644, IGNITE-17817 MvPartitionStorage hasn't supported 
snapshots yet")
+@ExtendWith({WorkDirectoryExtension.class, ConfigurationExtension.class})
 public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<PartitionListener> {
+    /** Factory to create RAFT command messages. */
+    private final TableMessagesFactory msgFactory = new TableMessagesFactory();
+
+    /** Factory for creating replica command messages. */
+    private final ReplicaMessagesFactory replicaMessagesFactory = new 
ReplicaMessagesFactory();
+
+    @InjectConfiguration("mock.tables.foo = {}")
+    private TablesConfiguration tablesCfg;
+
+    @InjectConfiguration("mock {flushDelayMillis = 0, defaultRegion {size = 
16777216, writeBufferSize = 16777216}}")
+    private RocksDbStorageEngineConfiguration engineConfig;
+
     private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(
             1,
             new Column[]{new Column("key", NativeTypes.INT64, false)},
             new Column[]{new Column("value", NativeTypes.INT64, false)}
     );
 
-    private static final BinaryConverter keyConverter = 
BinaryConverter.forKey(SCHEMA);
+    private static final BinaryConverter rowConverter = 
BinaryConverter.forRow(SCHEMA);
 
-    private static final Row FIRST_KEY = createKeyRow(0);
+    private static final Row FIRST_KEY = createKeyRow(1);
 
-    private static final Row FIRST_VALUE = createKeyValueRow(0, 0);
+    private static final Row FIRST_VALUE = createKeyValueRow(1, 1);
 
-    private static final Row SECOND_KEY = createKeyRow(1);
+    private static final Row SECOND_KEY = createKeyRow(2);
 
-    private static final Row SECOND_VALUE = createKeyValueRow(1, 1);
+    private static final Row SECOND_VALUE = createKeyValueRow(2, 2);
 
-    /**
-     * Paths for created partition listeners.
-     */
+    /** Paths for created partition listeners. */
     private final Map<PartitionListener, Path> paths = new 
ConcurrentHashMap<>();
 
-    private final List<TxManager> managers = new ArrayList<>();
+    /** Map of node indexes to partition listeners. */
+    private final Map<Integer, PartitionListener> partListeners = new 
ConcurrentHashMap<>();
+
+    /** Map of node indexes to table storages. */
+    private final Map<Integer, MvTableStorage> mvTableStorages = new 
ConcurrentHashMap<>();
+
+    /** Map of node indexes to partition storages. */
+    private final Map<Integer, MvPartitionStorage> mvPartitionStorages = new 
ConcurrentHashMap<>();
+
+    /** Map of node indexes to transaction managers. */
+    private final Map<Integer, TxManager> txManagers = new 
ConcurrentHashMap<>();
+
+    private ReplicaService replicaService;
 
     private final Function<String, ClusterNode> consistentIdToNode = addr
             -> new ClusterNode("node1", "node1", new NetworkAddress(addr, 
3333));
 
-    private final ReplicaService replicaService = mock(ReplicaService.class);
+    private final HybridClock hybridClock = new HybridClockImpl();
+
+    private int stoppedNodeIndex;
+
+    private InternalTable table;
+
+    private final LinkedList<AutoCloseable> closeables = new LinkedList<>();
 
     @BeforeEach
-    void initMocks() {
-        
doReturn(CompletableFuture.completedFuture(null)).when(replicaService).invoke(any(),
 any());
+    @Override
+    public void beforeTest(TestInfo testInfo) {
+        super.beforeTest(testInfo);
+
+        closeables.clear();
     }
 
     @AfterEach
     @Override
     public void afterTest() throws Exception {
         super.afterTest();
 
-        for (TxManager txManager : managers) {
-            txManager.stop();
-        }
+        closeAll(closeables);
     }
 
     /** {@inheritDoc} */
     @Override
     public void beforeFollowerStop(RaftGroupService service, RaftServer 
server) throws Exception {
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-17817 Use 
Replica layer with new transaction protocol.
-        TxManagerImpl txManager = new TxManagerImpl(replicaService, new 
HeapLockManager(), new HybridClockImpl());
+        PartitionReplicaListener partitionReplicaListener = 
mockPartitionReplicaListener(service);
+
+        replicaService = mock(ReplicaService.class);
 
-        managers.add(txManager);
+        when(replicaService.invoke(any(), any()))
+                .thenAnswer(invocationOnMock -> 
partitionReplicaListener.invoke(invocationOnMock.getArgument(1)));
 
-        txManager.start();
+        for (int i = 0; i <= 2; i++) {
+            TxManager txManager = new TxManagerImpl(replicaService, new 
HeapLockManager(), hybridClock);
+            txManagers.put(i, txManager);
+        }
 
-        var table = new InternalTableImpl(
+        table = new InternalTableImpl(
                 "table",
                 UUID.randomUUID(),
                 Int2ObjectMaps.singleton(0, service),
                 1,
                 consistentIdToNode,
-                txManager,
+                txManagers.get(0),
                 mock(MvTableStorage.class),
-                mock(TxStateTableStorage.class),
+                new TestTxStateTableStorage(),
                 replicaService,
-                mock(HybridClock.class)
+                hybridClock
         );
 
+        closeables.add(() -> table.close());
+
         table.upsert(FIRST_VALUE, null).get();
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public void afterFollowerStop(RaftGroupService service, RaftServer server) 
throws Exception {
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-17817 Use 
Replica layer with new transaction protocol.
-        TxManagerImpl txManager = new TxManagerImpl(replicaService, new 
HeapLockManager(), new HybridClockImpl());
+    private PartitionReplicaListener 
mockPartitionReplicaListener(RaftGroupService service) {
+        PartitionReplicaListener partitionReplicaListener = 
mock(PartitionReplicaListener.class);
+
+        
when(partitionReplicaListener.invoke(any())).thenAnswer(invocationOnMock -> {
+            ReplicaRequest req = invocationOnMock.getArgument(0);
+
+            if (req instanceof ReadWriteSingleRowReplicaRequest) {
+                ReadWriteSingleRowReplicaRequest req0 = 
(ReadWriteSingleRowReplicaRequest) req;
+
+                if (req0.requestType() == RequestType.RW_GET) {
+                    int storageIndex = stoppedNodeIndex == 0 ? 1 : 0;
+                    MvPartitionStorage partitionStorage = 
mvPartitionStorages.get(storageIndex);
+
+                    Map<ByteBuffer, RowId> primaryIndex = 
rowsToRowIds(partitionStorage);
+                    RowId rowId = 
primaryIndex.get(req0.binaryRow().keySlice());
+                    BinaryRow row = 
rowConverter.fromTuple(partitionStorage.read(rowId, 
HybridTimestamp.MAX_VALUE).tableRow().tupleSlice());
+
+                    return completedFuture(row);
+                }
+
+                // Non-null binary row if UPSERT, otherwise it's implied that 
request type is DELETE.
+                BinaryRow binaryRow = req0.requestType() == 
RequestType.RW_UPSERT ? req0.binaryRow() : null;
+                TableRow tableRow = binaryRow == null ? null : 
TableRowConverter.fromBinaryRow(binaryRow, rowConverter);
+
+                UpdateCommand cmd = msgFactory.updateCommand()
+                        .txId(req0.transactionId())
+                        .tablePartitionId(tablePartitionId(new 
TablePartitionId(UUID.randomUUID(), 0)))
+                        .rowUuid(new RowId(0).uuid())
+                        .rowBuffer(tableRow == null ? null : 
tableRow.byteBuffer())
+                        .safeTime(hybridTimestamp(hybridClock.now()))
+                        .build();
+
+                return service.run(cmd);
+            } else if (req instanceof TxFinishReplicaRequest) {
+                TxFinishReplicaRequest req0 = (TxFinishReplicaRequest) req;
+
+                FinishTxCommand cmd = msgFactory.finishTxCommand()
+                        .txId(req0.txId())
+                        .commit(req0.commit())
+                        
.commitTimestamp(hybridTimestamp(req0.commitTimestamp()))
+                        .tablePartitionIds(asList(tablePartitionId(new 
TablePartitionId(UUID.randomUUID(), 0))))
+                        .safeTime(hybridTimestamp(hybridClock.now()))
+                        .build();
+
+                return service.run(cmd)
+                        .thenCompose(ignored -> {
+                            TxCleanupCommand cleanupCmd = 
msgFactory.txCleanupCommand()
+                                    .txId(req0.txId())
+                                    .commit(req0.commit())
+                                    
.commitTimestamp(hybridTimestamp(req0.commitTimestamp()))
+                                    
.safeTime(hybridTimestamp(hybridClock.now()))
+                                    .build();
+
+                            return service.run(cleanupCmd);
+                        });
+            }
 
-        managers.add(txManager);
+            throw new AssertionError("Unexpected request: " + req);
+        });
 
-        txManager.start();
+        return partitionReplicaListener;
+    }
 
-        var table = new InternalTableImpl(
-                "table",
-                UUID.randomUUID(),
-                Int2ObjectMaps.singleton(0, service),
-                1,
-                consistentIdToNode,
-                txManager,
-                mock(MvTableStorage.class),
-                mock(TxStateTableStorage.class),
-                replicaService,
-                mock(HybridClock.class)
-        );
+    /**
+     * Method to convert from {@link HybridTimestamp} object to 
NetworkMessage-based {@link HybridTimestampMessage} object.
+     *
+     * @param tmstmp {@link HybridTimestamp} object to convert to {@link 
HybridTimestampMessage}.
+     * @return {@link HybridTimestampMessage} object obtained from {@link 
HybridTimestamp}.
+     */
+    private HybridTimestampMessage hybridTimestamp(HybridTimestamp tmstmp) {

Review Comment:
   let not use abbreviation here 



##########
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java:
##########
@@ -44,193 +47,305 @@
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.command.HybridTimestampMessage;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.schema.BinaryConverter;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.schema.TableRowConverter;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
-import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
+import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
+import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
+import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
+import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
+import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
-import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
 import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
+import 
org.apache.ignite.internal.tx.storage.state.test.TestTxStateTableStorage;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
  * Persistent partitions raft group snapshots tests.
  */
-@Disabled("IGNITE-16644, IGNITE-17817 MvPartitionStorage hasn't supported 
snapshots yet")
+@ExtendWith({WorkDirectoryExtension.class, ConfigurationExtension.class})
 public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<PartitionListener> {
+    /** Factory to create RAFT command messages. */
+    private final TableMessagesFactory msgFactory = new TableMessagesFactory();
+
+    /** Factory for creating replica command messages. */
+    private final ReplicaMessagesFactory replicaMessagesFactory = new 
ReplicaMessagesFactory();
+
+    @InjectConfiguration("mock.tables.foo = {}")
+    private TablesConfiguration tablesCfg;
+
+    @InjectConfiguration("mock {flushDelayMillis = 0, defaultRegion {size = 
16777216, writeBufferSize = 16777216}}")
+    private RocksDbStorageEngineConfiguration engineConfig;
+
     private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(
             1,
             new Column[]{new Column("key", NativeTypes.INT64, false)},
             new Column[]{new Column("value", NativeTypes.INT64, false)}
     );
 
-    private static final BinaryConverter keyConverter = 
BinaryConverter.forKey(SCHEMA);
+    private static final BinaryConverter rowConverter = 
BinaryConverter.forRow(SCHEMA);
 
-    private static final Row FIRST_KEY = createKeyRow(0);
+    private static final Row FIRST_KEY = createKeyRow(1);
 
-    private static final Row FIRST_VALUE = createKeyValueRow(0, 0);
+    private static final Row FIRST_VALUE = createKeyValueRow(1, 1);
 
-    private static final Row SECOND_KEY = createKeyRow(1);
+    private static final Row SECOND_KEY = createKeyRow(2);
 
-    private static final Row SECOND_VALUE = createKeyValueRow(1, 1);
+    private static final Row SECOND_VALUE = createKeyValueRow(2, 2);
 
-    /**
-     * Paths for created partition listeners.
-     */
+    /** Paths for created partition listeners. */
     private final Map<PartitionListener, Path> paths = new 
ConcurrentHashMap<>();
 
-    private final List<TxManager> managers = new ArrayList<>();
+    /** Map of node indexes to partition listeners. */
+    private final Map<Integer, PartitionListener> partListeners = new 
ConcurrentHashMap<>();
+
+    /** Map of node indexes to table storages. */
+    private final Map<Integer, MvTableStorage> mvTableStorages = new 
ConcurrentHashMap<>();
+
+    /** Map of node indexes to partition storages. */
+    private final Map<Integer, MvPartitionStorage> mvPartitionStorages = new 
ConcurrentHashMap<>();
+
+    /** Map of node indexes to transaction managers. */
+    private final Map<Integer, TxManager> txManagers = new 
ConcurrentHashMap<>();
+
+    private ReplicaService replicaService;
 
     private final Function<String, ClusterNode> consistentIdToNode = addr
             -> new ClusterNode("node1", "node1", new NetworkAddress(addr, 
3333));
 
-    private final ReplicaService replicaService = mock(ReplicaService.class);
+    private final HybridClock hybridClock = new HybridClockImpl();
+
+    private int stoppedNodeIndex;
+
+    private InternalTable table;
+
+    private final LinkedList<AutoCloseable> closeables = new LinkedList<>();
 
     @BeforeEach
-    void initMocks() {
-        
doReturn(CompletableFuture.completedFuture(null)).when(replicaService).invoke(any(),
 any());
+    @Override
+    public void beforeTest(TestInfo testInfo) {
+        super.beforeTest(testInfo);
+
+        closeables.clear();
     }
 
     @AfterEach
     @Override
     public void afterTest() throws Exception {
         super.afterTest();
 
-        for (TxManager txManager : managers) {
-            txManager.stop();
-        }
+        closeAll(closeables);
     }
 
     /** {@inheritDoc} */
     @Override
     public void beforeFollowerStop(RaftGroupService service, RaftServer 
server) throws Exception {
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-17817 Use 
Replica layer with new transaction protocol.
-        TxManagerImpl txManager = new TxManagerImpl(replicaService, new 
HeapLockManager(), new HybridClockImpl());
+        PartitionReplicaListener partitionReplicaListener = 
mockPartitionReplicaListener(service);
+
+        replicaService = mock(ReplicaService.class);
 
-        managers.add(txManager);
+        when(replicaService.invoke(any(), any()))
+                .thenAnswer(invocationOnMock -> 
partitionReplicaListener.invoke(invocationOnMock.getArgument(1)));
 
-        txManager.start();
+        for (int i = 0; i <= 2; i++) {

Review Comment:
   We definitely should take the number of nodes from the test configuration. 



##########
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java:
##########
@@ -44,193 +47,305 @@
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.command.HybridTimestampMessage;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.schema.BinaryConverter;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.schema.TableRowConverter;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
-import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
+import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
+import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
+import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
+import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
+import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
-import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
 import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
+import 
org.apache.ignite.internal.tx.storage.state.test.TestTxStateTableStorage;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
  * Persistent partitions raft group snapshots tests.
  */
-@Disabled("IGNITE-16644, IGNITE-17817 MvPartitionStorage hasn't supported 
snapshots yet")
+@ExtendWith({WorkDirectoryExtension.class, ConfigurationExtension.class})
 public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<PartitionListener> {
+    /** Factory to create RAFT command messages. */
+    private final TableMessagesFactory msgFactory = new TableMessagesFactory();
+
+    /** Factory for creating replica command messages. */
+    private final ReplicaMessagesFactory replicaMessagesFactory = new 
ReplicaMessagesFactory();
+
+    @InjectConfiguration("mock.tables.foo = {}")
+    private TablesConfiguration tablesCfg;
+
+    @InjectConfiguration("mock {flushDelayMillis = 0, defaultRegion {size = 
16777216, writeBufferSize = 16777216}}")
+    private RocksDbStorageEngineConfiguration engineConfig;
+
     private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(
             1,
             new Column[]{new Column("key", NativeTypes.INT64, false)},
             new Column[]{new Column("value", NativeTypes.INT64, false)}
     );
 
-    private static final BinaryConverter keyConverter = 
BinaryConverter.forKey(SCHEMA);
+    private static final BinaryConverter rowConverter = 
BinaryConverter.forRow(SCHEMA);
 
-    private static final Row FIRST_KEY = createKeyRow(0);
+    private static final Row FIRST_KEY = createKeyRow(1);
 
-    private static final Row FIRST_VALUE = createKeyValueRow(0, 0);
+    private static final Row FIRST_VALUE = createKeyValueRow(1, 1);
 
-    private static final Row SECOND_KEY = createKeyRow(1);
+    private static final Row SECOND_KEY = createKeyRow(2);
 
-    private static final Row SECOND_VALUE = createKeyValueRow(1, 1);
+    private static final Row SECOND_VALUE = createKeyValueRow(2, 2);
 
-    /**
-     * Paths for created partition listeners.
-     */
+    /** Paths for created partition listeners. */
     private final Map<PartitionListener, Path> paths = new 
ConcurrentHashMap<>();
 
-    private final List<TxManager> managers = new ArrayList<>();
+    /** Map of node indexes to partition listeners. */
+    private final Map<Integer, PartitionListener> partListeners = new 
ConcurrentHashMap<>();
+
+    /** Map of node indexes to table storages. */
+    private final Map<Integer, MvTableStorage> mvTableStorages = new 
ConcurrentHashMap<>();
+
+    /** Map of node indexes to partition storages. */
+    private final Map<Integer, MvPartitionStorage> mvPartitionStorages = new 
ConcurrentHashMap<>();
+
+    /** Map of node indexes to transaction managers. */
+    private final Map<Integer, TxManager> txManagers = new 
ConcurrentHashMap<>();
+
+    private ReplicaService replicaService;
 
     private final Function<String, ClusterNode> consistentIdToNode = addr
             -> new ClusterNode("node1", "node1", new NetworkAddress(addr, 
3333));
 
-    private final ReplicaService replicaService = mock(ReplicaService.class);
+    private final HybridClock hybridClock = new HybridClockImpl();
+
+    private int stoppedNodeIndex;
+
+    private InternalTable table;
+
+    private final LinkedList<AutoCloseable> closeables = new LinkedList<>();
 
     @BeforeEach
-    void initMocks() {
-        
doReturn(CompletableFuture.completedFuture(null)).when(replicaService).invoke(any(),
 any());
+    @Override
+    public void beforeTest(TestInfo testInfo) {
+        super.beforeTest(testInfo);
+
+        closeables.clear();
     }
 
     @AfterEach
     @Override
     public void afterTest() throws Exception {
         super.afterTest();
 
-        for (TxManager txManager : managers) {
-            txManager.stop();
-        }
+        closeAll(closeables);
     }
 
     /** {@inheritDoc} */
     @Override
     public void beforeFollowerStop(RaftGroupService service, RaftServer 
server) throws Exception {
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-17817 Use 
Replica layer with new transaction protocol.
-        TxManagerImpl txManager = new TxManagerImpl(replicaService, new 
HeapLockManager(), new HybridClockImpl());
+        PartitionReplicaListener partitionReplicaListener = 
mockPartitionReplicaListener(service);
+
+        replicaService = mock(ReplicaService.class);
 
-        managers.add(txManager);
+        when(replicaService.invoke(any(), any()))
+                .thenAnswer(invocationOnMock -> 
partitionReplicaListener.invoke(invocationOnMock.getArgument(1)));
 
-        txManager.start();
+        for (int i = 0; i <= 2; i++) {
+            TxManager txManager = new TxManagerImpl(replicaService, new 
HeapLockManager(), hybridClock);
+            txManagers.put(i, txManager);
+        }
 
-        var table = new InternalTableImpl(
+        table = new InternalTableImpl(
                 "table",
                 UUID.randomUUID(),
                 Int2ObjectMaps.singleton(0, service),
                 1,
                 consistentIdToNode,
-                txManager,
+                txManagers.get(0),
                 mock(MvTableStorage.class),
-                mock(TxStateTableStorage.class),
+                new TestTxStateTableStorage(),
                 replicaService,
-                mock(HybridClock.class)
+                hybridClock
         );
 
+        closeables.add(() -> table.close());
+
         table.upsert(FIRST_VALUE, null).get();
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public void afterFollowerStop(RaftGroupService service, RaftServer server) 
throws Exception {
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-17817 Use 
Replica layer with new transaction protocol.
-        TxManagerImpl txManager = new TxManagerImpl(replicaService, new 
HeapLockManager(), new HybridClockImpl());
+    private PartitionReplicaListener 
mockPartitionReplicaListener(RaftGroupService service) {
+        PartitionReplicaListener partitionReplicaListener = 
mock(PartitionReplicaListener.class);
+
+        
when(partitionReplicaListener.invoke(any())).thenAnswer(invocationOnMock -> {
+            ReplicaRequest req = invocationOnMock.getArgument(0);
+
+            if (req instanceof ReadWriteSingleRowReplicaRequest) {
+                ReadWriteSingleRowReplicaRequest req0 = 
(ReadWriteSingleRowReplicaRequest) req;
+
+                if (req0.requestType() == RequestType.RW_GET) {
+                    int storageIndex = stoppedNodeIndex == 0 ? 1 : 0;
+                    MvPartitionStorage partitionStorage = 
mvPartitionStorages.get(storageIndex);
+
+                    Map<ByteBuffer, RowId> primaryIndex = 
rowsToRowIds(partitionStorage);
+                    RowId rowId = 
primaryIndex.get(req0.binaryRow().keySlice());
+                    BinaryRow row = 
rowConverter.fromTuple(partitionStorage.read(rowId, 
HybridTimestamp.MAX_VALUE).tableRow().tupleSlice());
+
+                    return completedFuture(row);
+                }
+
+                // Non-null binary row if UPSERT, otherwise it's implied that 
request type is DELETE.
+                BinaryRow binaryRow = req0.requestType() == 
RequestType.RW_UPSERT ? req0.binaryRow() : null;
+                TableRow tableRow = binaryRow == null ? null : 
TableRowConverter.fromBinaryRow(binaryRow, rowConverter);
+
+                UpdateCommand cmd = msgFactory.updateCommand()
+                        .txId(req0.transactionId())
+                        .tablePartitionId(tablePartitionId(new 
TablePartitionId(UUID.randomUUID(), 0)))
+                        .rowUuid(new RowId(0).uuid())
+                        .rowBuffer(tableRow == null ? null : 
tableRow.byteBuffer())
+                        .safeTime(hybridTimestamp(hybridClock.now()))
+                        .build();
+
+                return service.run(cmd);
+            } else if (req instanceof TxFinishReplicaRequest) {
+                TxFinishReplicaRequest req0 = (TxFinishReplicaRequest) req;
+
+                FinishTxCommand cmd = msgFactory.finishTxCommand()
+                        .txId(req0.txId())
+                        .commit(req0.commit())
+                        
.commitTimestamp(hybridTimestamp(req0.commitTimestamp()))
+                        .tablePartitionIds(asList(tablePartitionId(new 
TablePartitionId(UUID.randomUUID(), 0))))
+                        .safeTime(hybridTimestamp(hybridClock.now()))
+                        .build();
+
+                return service.run(cmd)
+                        .thenCompose(ignored -> {
+                            TxCleanupCommand cleanupCmd = 
msgFactory.txCleanupCommand()
+                                    .txId(req0.txId())
+                                    .commit(req0.commit())
+                                    
.commitTimestamp(hybridTimestamp(req0.commitTimestamp()))
+                                    
.safeTime(hybridTimestamp(hybridClock.now()))
+                                    .build();
+
+                            return service.run(cleanupCmd);
+                        });
+            }
 
-        managers.add(txManager);
+            throw new AssertionError("Unexpected request: " + req);
+        });
 
-        txManager.start();
+        return partitionReplicaListener;
+    }
 
-        var table = new InternalTableImpl(
-                "table",
-                UUID.randomUUID(),
-                Int2ObjectMaps.singleton(0, service),
-                1,
-                consistentIdToNode,
-                txManager,
-                mock(MvTableStorage.class),
-                mock(TxStateTableStorage.class),
-                replicaService,
-                mock(HybridClock.class)
-        );
+    /**
+     * Method to convert from {@link HybridTimestamp} object to 
NetworkMessage-based {@link HybridTimestampMessage} object.
+     *
+     * @param tmstmp {@link HybridTimestamp} object to convert to {@link 
HybridTimestampMessage}.
+     * @return {@link HybridTimestampMessage} object obtained from {@link 
HybridTimestamp}.
+     */
+    private HybridTimestampMessage hybridTimestamp(HybridTimestamp tmstmp) {
+        return tmstmp != null ? replicaMessagesFactory.hybridTimestampMessage()

Review Comment:
   Why couldn't we reuse 
`org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener#hybridTimestamp`?



##########
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java:
##########
@@ -44,193 +47,305 @@
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.command.HybridTimestampMessage;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.schema.BinaryConverter;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.schema.TableRowConverter;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
-import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
+import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
+import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
+import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
+import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
+import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
-import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
 import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
+import 
org.apache.ignite.internal.tx.storage.state.test.TestTxStateTableStorage;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
  * Persistent partitions raft group snapshots tests.
  */
-@Disabled("IGNITE-16644, IGNITE-17817 MvPartitionStorage hasn't supported 
snapshots yet")
+@ExtendWith({WorkDirectoryExtension.class, ConfigurationExtension.class})
 public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<PartitionListener> {
+    /** Factory to create RAFT command messages. */
+    private final TableMessagesFactory msgFactory = new TableMessagesFactory();
+
+    /** Factory for creating replica command messages. */
+    private final ReplicaMessagesFactory replicaMessagesFactory = new 
ReplicaMessagesFactory();
+
+    @InjectConfiguration("mock.tables.foo = {}")
+    private TablesConfiguration tablesCfg;
+
+    @InjectConfiguration("mock {flushDelayMillis = 0, defaultRegion {size = 
16777216, writeBufferSize = 16777216}}")
+    private RocksDbStorageEngineConfiguration engineConfig;
+
     private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(
             1,
             new Column[]{new Column("key", NativeTypes.INT64, false)},
             new Column[]{new Column("value", NativeTypes.INT64, false)}
     );
 
-    private static final BinaryConverter keyConverter = 
BinaryConverter.forKey(SCHEMA);
+    private static final BinaryConverter rowConverter = 
BinaryConverter.forRow(SCHEMA);
 
-    private static final Row FIRST_KEY = createKeyRow(0);
+    private static final Row FIRST_KEY = createKeyRow(1);
 
-    private static final Row FIRST_VALUE = createKeyValueRow(0, 0);
+    private static final Row FIRST_VALUE = createKeyValueRow(1, 1);
 
-    private static final Row SECOND_KEY = createKeyRow(1);
+    private static final Row SECOND_KEY = createKeyRow(2);
 
-    private static final Row SECOND_VALUE = createKeyValueRow(1, 1);
+    private static final Row SECOND_VALUE = createKeyValueRow(2, 2);
 
-    /**
-     * Paths for created partition listeners.
-     */
+    /** Paths for created partition listeners. */
     private final Map<PartitionListener, Path> paths = new 
ConcurrentHashMap<>();
 
-    private final List<TxManager> managers = new ArrayList<>();
+    /** Map of node indexes to partition listeners. */
+    private final Map<Integer, PartitionListener> partListeners = new 
ConcurrentHashMap<>();
+
+    /** Map of node indexes to table storages. */
+    private final Map<Integer, MvTableStorage> mvTableStorages = new 
ConcurrentHashMap<>();
+
+    /** Map of node indexes to partition storages. */
+    private final Map<Integer, MvPartitionStorage> mvPartitionStorages = new 
ConcurrentHashMap<>();
+
+    /** Map of node indexes to transaction managers. */
+    private final Map<Integer, TxManager> txManagers = new 
ConcurrentHashMap<>();
+
+    private ReplicaService replicaService;
 
     private final Function<String, ClusterNode> consistentIdToNode = addr
             -> new ClusterNode("node1", "node1", new NetworkAddress(addr, 
3333));
 
-    private final ReplicaService replicaService = mock(ReplicaService.class);
+    private final HybridClock hybridClock = new HybridClockImpl();
+
+    private int stoppedNodeIndex;
+
+    private InternalTable table;
+
+    private final LinkedList<AutoCloseable> closeables = new LinkedList<>();
 
     @BeforeEach
-    void initMocks() {
-        
doReturn(CompletableFuture.completedFuture(null)).when(replicaService).invoke(any(),
 any());
+    @Override
+    public void beforeTest(TestInfo testInfo) {
+        super.beforeTest(testInfo);
+
+        closeables.clear();
     }
 
     @AfterEach
     @Override
     public void afterTest() throws Exception {
         super.afterTest();
 
-        for (TxManager txManager : managers) {
-            txManager.stop();
-        }
+        closeAll(closeables);
     }
 
     /** {@inheritDoc} */
     @Override
     public void beforeFollowerStop(RaftGroupService service, RaftServer 
server) throws Exception {
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-17817 Use 
Replica layer with new transaction protocol.
-        TxManagerImpl txManager = new TxManagerImpl(replicaService, new 
HeapLockManager(), new HybridClockImpl());
+        PartitionReplicaListener partitionReplicaListener = 
mockPartitionReplicaListener(service);
+
+        replicaService = mock(ReplicaService.class);
 
-        managers.add(txManager);
+        when(replicaService.invoke(any(), any()))
+                .thenAnswer(invocationOnMock -> 
partitionReplicaListener.invoke(invocationOnMock.getArgument(1)));
 
-        txManager.start();
+        for (int i = 0; i <= 2; i++) {
+            TxManager txManager = new TxManagerImpl(replicaService, new 
HeapLockManager(), hybridClock);
+            txManagers.put(i, txManager);
+        }
 
-        var table = new InternalTableImpl(
+        table = new InternalTableImpl(
                 "table",
                 UUID.randomUUID(),
                 Int2ObjectMaps.singleton(0, service),
                 1,
                 consistentIdToNode,
-                txManager,
+                txManagers.get(0),
                 mock(MvTableStorage.class),
-                mock(TxStateTableStorage.class),
+                new TestTxStateTableStorage(),
                 replicaService,
-                mock(HybridClock.class)
+                hybridClock
         );
 
+        closeables.add(() -> table.close());
+
         table.upsert(FIRST_VALUE, null).get();
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public void afterFollowerStop(RaftGroupService service, RaftServer server) 
throws Exception {
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-17817 Use 
Replica layer with new transaction protocol.
-        TxManagerImpl txManager = new TxManagerImpl(replicaService, new 
HeapLockManager(), new HybridClockImpl());
+    private PartitionReplicaListener 
mockPartitionReplicaListener(RaftGroupService service) {
+        PartitionReplicaListener partitionReplicaListener = 
mock(PartitionReplicaListener.class);
+
+        
when(partitionReplicaListener.invoke(any())).thenAnswer(invocationOnMock -> {
+            ReplicaRequest req = invocationOnMock.getArgument(0);
+
+            if (req instanceof ReadWriteSingleRowReplicaRequest) {
+                ReadWriteSingleRowReplicaRequest req0 = 
(ReadWriteSingleRowReplicaRequest) req;
+
+                if (req0.requestType() == RequestType.RW_GET) {
+                    int storageIndex = stoppedNodeIndex == 0 ? 1 : 0;
+                    MvPartitionStorage partitionStorage = 
mvPartitionStorages.get(storageIndex);
+
+                    Map<ByteBuffer, RowId> primaryIndex = 
rowsToRowIds(partitionStorage);
+                    RowId rowId = 
primaryIndex.get(req0.binaryRow().keySlice());
+                    BinaryRow row = 
rowConverter.fromTuple(partitionStorage.read(rowId, 
HybridTimestamp.MAX_VALUE).tableRow().tupleSlice());
+
+                    return completedFuture(row);
+                }
+
+                // Non-null binary row if UPSERT, otherwise it's implied that 
request type is DELETE.
+                BinaryRow binaryRow = req0.requestType() == 
RequestType.RW_UPSERT ? req0.binaryRow() : null;
+                TableRow tableRow = binaryRow == null ? null : 
TableRowConverter.fromBinaryRow(binaryRow, rowConverter);
+
+                UpdateCommand cmd = msgFactory.updateCommand()
+                        .txId(req0.transactionId())
+                        .tablePartitionId(tablePartitionId(new 
TablePartitionId(UUID.randomUUID(), 0)))
+                        .rowUuid(new RowId(0).uuid())
+                        .rowBuffer(tableRow == null ? null : 
tableRow.byteBuffer())
+                        .safeTime(hybridTimestamp(hybridClock.now()))
+                        .build();
+
+                return service.run(cmd);
+            } else if (req instanceof TxFinishReplicaRequest) {
+                TxFinishReplicaRequest req0 = (TxFinishReplicaRequest) req;
+
+                FinishTxCommand cmd = msgFactory.finishTxCommand()
+                        .txId(req0.txId())
+                        .commit(req0.commit())
+                        
.commitTimestamp(hybridTimestamp(req0.commitTimestamp()))
+                        .tablePartitionIds(asList(tablePartitionId(new 
TablePartitionId(UUID.randomUUID(), 0))))
+                        .safeTime(hybridTimestamp(hybridClock.now()))
+                        .build();
+
+                return service.run(cmd)
+                        .thenCompose(ignored -> {
+                            TxCleanupCommand cleanupCmd = 
msgFactory.txCleanupCommand()
+                                    .txId(req0.txId())
+                                    .commit(req0.commit())
+                                    
.commitTimestamp(hybridTimestamp(req0.commitTimestamp()))
+                                    
.safeTime(hybridTimestamp(hybridClock.now()))
+                                    .build();
+
+                            return service.run(cleanupCmd);
+                        });
+            }
 
-        managers.add(txManager);
+            throw new AssertionError("Unexpected request: " + req);
+        });
 
-        txManager.start();
+        return partitionReplicaListener;
+    }
 
-        var table = new InternalTableImpl(
-                "table",
-                UUID.randomUUID(),
-                Int2ObjectMaps.singleton(0, service),
-                1,
-                consistentIdToNode,
-                txManager,
-                mock(MvTableStorage.class),
-                mock(TxStateTableStorage.class),
-                replicaService,
-                mock(HybridClock.class)
-        );
+    /**
+     * Method to convert from {@link HybridTimestamp} object to 
NetworkMessage-based {@link HybridTimestampMessage} object.
+     *
+     * @param tmstmp {@link HybridTimestamp} object to convert to {@link 
HybridTimestampMessage}.
+     * @return {@link HybridTimestampMessage} object obtained from {@link 
HybridTimestamp}.
+     */
+    private HybridTimestampMessage hybridTimestamp(HybridTimestamp tmstmp) {
+        return tmstmp != null ? replicaMessagesFactory.hybridTimestampMessage()
+                .physical(tmstmp.getPhysical())
+                .logical(tmstmp.getLogical())
+                .build()
+                : null;
+    }
 
+    /**
+     * Method to convert from {@link TablePartitionId} object to command-based 
{@link TablePartitionIdMessage} object.
+     *
+     * @param tablePartId {@link TablePartitionId} object to convert to {@link 
TablePartitionIdMessage}.
+     * @return {@link TablePartitionIdMessage} object converted from argument.
+     */
+    private TablePartitionIdMessage tablePartitionId(TablePartitionId 
tablePartId) {

Review Comment:
   The same, lets make a static method that accepts message factories and use 
general logic from 
`org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener#tablePartitionId`



##########
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java:
##########
@@ -44,193 +47,305 @@
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.command.HybridTimestampMessage;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.schema.BinaryConverter;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.schema.TableRowConverter;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
-import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
+import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
+import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
+import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
+import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
+import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
-import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
 import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
+import 
org.apache.ignite.internal.tx.storage.state.test.TestTxStateTableStorage;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
  * Persistent partitions raft group snapshots tests.
  */
-@Disabled("IGNITE-16644, IGNITE-17817 MvPartitionStorage hasn't supported 
snapshots yet")
+@ExtendWith({WorkDirectoryExtension.class, ConfigurationExtension.class})
 public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<PartitionListener> {
+    /** Factory to create RAFT command messages. */
+    private final TableMessagesFactory msgFactory = new TableMessagesFactory();
+
+    /** Factory for creating replica command messages. */
+    private final ReplicaMessagesFactory replicaMessagesFactory = new 
ReplicaMessagesFactory();
+
+    @InjectConfiguration("mock.tables.foo = {}")
+    private TablesConfiguration tablesCfg;
+
+    @InjectConfiguration("mock {flushDelayMillis = 0, defaultRegion {size = 
16777216, writeBufferSize = 16777216}}")
+    private RocksDbStorageEngineConfiguration engineConfig;
+
     private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(
             1,
             new Column[]{new Column("key", NativeTypes.INT64, false)},
             new Column[]{new Column("value", NativeTypes.INT64, false)}
     );
 
-    private static final BinaryConverter keyConverter = 
BinaryConverter.forKey(SCHEMA);
+    private static final BinaryConverter rowConverter = 
BinaryConverter.forRow(SCHEMA);
 
-    private static final Row FIRST_KEY = createKeyRow(0);
+    private static final Row FIRST_KEY = createKeyRow(1);
 
-    private static final Row FIRST_VALUE = createKeyValueRow(0, 0);
+    private static final Row FIRST_VALUE = createKeyValueRow(1, 1);
 
-    private static final Row SECOND_KEY = createKeyRow(1);
+    private static final Row SECOND_KEY = createKeyRow(2);
 
-    private static final Row SECOND_VALUE = createKeyValueRow(1, 1);
+    private static final Row SECOND_VALUE = createKeyValueRow(2, 2);
 
-    /**
-     * Paths for created partition listeners.
-     */
+    /** Paths for created partition listeners. */
     private final Map<PartitionListener, Path> paths = new 
ConcurrentHashMap<>();
 
-    private final List<TxManager> managers = new ArrayList<>();
+    /** Map of node indexes to partition listeners. */
+    private final Map<Integer, PartitionListener> partListeners = new 
ConcurrentHashMap<>();
+
+    /** Map of node indexes to table storages. */
+    private final Map<Integer, MvTableStorage> mvTableStorages = new 
ConcurrentHashMap<>();
+
+    /** Map of node indexes to partition storages. */
+    private final Map<Integer, MvPartitionStorage> mvPartitionStorages = new 
ConcurrentHashMap<>();
+
+    /** Map of node indexes to transaction managers. */
+    private final Map<Integer, TxManager> txManagers = new 
ConcurrentHashMap<>();
+
+    private ReplicaService replicaService;
 
     private final Function<String, ClusterNode> consistentIdToNode = addr
             -> new ClusterNode("node1", "node1", new NetworkAddress(addr, 
3333));
 
-    private final ReplicaService replicaService = mock(ReplicaService.class);
+    private final HybridClock hybridClock = new HybridClockImpl();
+
+    private int stoppedNodeIndex;
+
+    private InternalTable table;
+
+    private final LinkedList<AutoCloseable> closeables = new LinkedList<>();
 
     @BeforeEach
-    void initMocks() {
-        
doReturn(CompletableFuture.completedFuture(null)).when(replicaService).invoke(any(),
 any());
+    @Override
+    public void beforeTest(TestInfo testInfo) {
+        super.beforeTest(testInfo);
+
+        closeables.clear();
     }
 
     @AfterEach
     @Override
     public void afterTest() throws Exception {
         super.afterTest();
 
-        for (TxManager txManager : managers) {
-            txManager.stop();
-        }
+        closeAll(closeables);
     }
 
     /** {@inheritDoc} */
     @Override
     public void beforeFollowerStop(RaftGroupService service, RaftServer 
server) throws Exception {
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-17817 Use 
Replica layer with new transaction protocol.
-        TxManagerImpl txManager = new TxManagerImpl(replicaService, new 
HeapLockManager(), new HybridClockImpl());
+        PartitionReplicaListener partitionReplicaListener = 
mockPartitionReplicaListener(service);
+
+        replicaService = mock(ReplicaService.class);
 
-        managers.add(txManager);
+        when(replicaService.invoke(any(), any()))
+                .thenAnswer(invocationOnMock -> 
partitionReplicaListener.invoke(invocationOnMock.getArgument(1)));
 
-        txManager.start();
+        for (int i = 0; i <= 2; i++) {
+            TxManager txManager = new TxManagerImpl(replicaService, new 
HeapLockManager(), hybridClock);
+            txManagers.put(i, txManager);
+        }
 
-        var table = new InternalTableImpl(
+        table = new InternalTableImpl(
                 "table",
                 UUID.randomUUID(),
                 Int2ObjectMaps.singleton(0, service),
                 1,
                 consistentIdToNode,
-                txManager,
+                txManagers.get(0),

Review Comment:
   is it ok that we pass here txManager with index 0, but node with this 
manager could be stopped?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to