ibessonov commented on code in PR #1226:
URL: https://github.com/apache/ignite-3/pull/1226#discussion_r1001623608


##########
modules/core/src/main/java/org/apache/ignite/internal/lock/AutoLockup.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.lock;
+
+/**
+ * Represents a lockup (this is an aquisition and owning of a lock like {@link 
java.util.concurrent.locks.Lock})

Review Comment:
   This is a weirdly specific description for a thing that only removes an 
exception from the signature. It doesn't have to represent locks. So why have 
you decided to do it about locks exclusively?



##########
modules/core/src/test/java/org/apache/ignite/internal/lock/ReusableLockLockupTest.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.lock;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.locks.ReentrantLock;
+import org.junit.jupiter.api.Test;
+
+class ReusableLockLockupTest {
+    private final ReentrantLock lock = new ReentrantLock();
+
+    @SuppressWarnings("resource")
+    @Test
+    void doesNotLockTheLockOnCreation() {
+        ReusableLockLockup ignored = new ReusableLockLockup(lock);
+
+        assertFalse(lock.isLocked());
+    }
+
+    @SuppressWarnings("resource")
+    @Test
+    void locksTheLockOnAcquire() {
+        try (AutoLockup ignored = new ReusableLockLockup(lock).acquireLock()) {
+            assertTrue(lock.isLocked());
+        }
+    }
+
+    @SuppressWarnings({"resource", "EmptyTryBlock"})
+    @Test
+    void unlocksTheLockOnClose() {
+        try (AutoLockup ignored = new ReusableLockLockup(lock).acquireLock()) {
+            // No-op.
+        }
+
+        assertFalse(lock.isLocked());
+    }
+}

Review Comment:
   Please add an extra line at the end



##########
modules/table/src/integrationTest/java/org/apache/ignite/distributed/TestPartitionDataStorage.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.distributed;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lock.AutoLockup;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.TxIdMismatchException;
+import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Test implementation of {@link PartitionDataStorage}.
+ */
+public class TestPartitionDataStorage implements PartitionDataStorage {
+    private final MvPartitionStorage partitionStorage;
+
+    private final Lock partitionSnapshotsLock = new ReentrantLock();
+
+    public TestPartitionDataStorage(MvPartitionStorage partitionStorage) {
+        this.partitionStorage = partitionStorage;
+    }
+
+    @Override
+    public <V> V runConsistently(WriteClosure<V> closure) throws 
StorageException {
+        return partitionStorage.runConsistently(closure);
+    }
+
+    @SuppressWarnings("Convert2Lambda")
+    @Override
+    public AutoLockup acquirePartitionSnapshotsReadLock() {
+        partitionSnapshotsLock.lock();
+
+        return new AutoLockup() {
+            @Override
+            public void close() {
+                partitionSnapshotsLock.unlock();
+            }
+        };
+    }
+
+    @Override
+    public CompletableFuture<Void> flush() {
+        return partitionStorage.flush();
+    }
+
+    @Override
+    public long lastAppliedIndex() {
+        return partitionStorage.lastAppliedIndex();
+    }
+
+    @Override
+    public void lastAppliedIndex(long lastAppliedIndex) throws 
StorageException {
+        partitionStorage.lastAppliedIndex(lastAppliedIndex);
+    }
+
+    @Override
+    public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, 
UUID txId, UUID commitTableId,
+            int commitPartitionId) throws TxIdMismatchException, 
StorageException {
+        return partitionStorage.addWrite(rowId, row, txId, commitTableId, 
commitPartitionId);
+    }
+
+    @Override
+    public @Nullable BinaryRow abortWrite(RowId rowId) throws StorageException 
{
+        return partitionStorage.abortWrite(rowId);
+    }
+
+    @Override
+    public void commitWrite(RowId rowId, HybridTimestamp timestamp) throws 
StorageException {
+        partitionStorage.commitWrite(rowId, timestamp);
+    }
+
+    @Override
+    public MvPartitionStorage getStorage() {
+        return partitionStorage;
+    }
+
+    @Override
+    public void close() throws Exception {

Review Comment:
   I don't like this method, but I know that you have nothing to do with it. In 
my view, storages must be closed by the outside code when raft group is 
stopped. Raft listeners didn't instantiate a storage, so they don't need to 
destroy them



##########
modules/table/src/integrationTest/java/org/apache/ignite/distributed/TestPartitionDataStorage.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.distributed;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lock.AutoLockup;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.TxIdMismatchException;
+import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Test implementation of {@link PartitionDataStorage}.
+ */
+public class TestPartitionDataStorage implements PartitionDataStorage {
+    private final MvPartitionStorage partitionStorage;
+
+    private final Lock partitionSnapshotsLock = new ReentrantLock();
+
+    public TestPartitionDataStorage(MvPartitionStorage partitionStorage) {
+        this.partitionStorage = partitionStorage;
+    }
+
+    @Override
+    public <V> V runConsistently(WriteClosure<V> closure) throws 
StorageException {
+        return partitionStorage.runConsistently(closure);
+    }
+
+    @SuppressWarnings("Convert2Lambda")
+    @Override
+    public AutoLockup acquirePartitionSnapshotsReadLock() {
+        partitionSnapshotsLock.lock();
+
+        return new AutoLockup() {
+            @Override
+            public void close() {
+                partitionSnapshotsLock.unlock();
+            }
+        };
+    }
+
+    @Override
+    public CompletableFuture<Void> flush() {
+        return partitionStorage.flush();
+    }
+
+    @Override
+    public long lastAppliedIndex() {
+        return partitionStorage.lastAppliedIndex();
+    }
+
+    @Override
+    public void lastAppliedIndex(long lastAppliedIndex) throws 
StorageException {
+        partitionStorage.lastAppliedIndex(lastAppliedIndex);
+    }
+
+    @Override
+    public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, 
UUID txId, UUID commitTableId,
+            int commitPartitionId) throws TxIdMismatchException, 
StorageException {
+        return partitionStorage.addWrite(rowId, row, txId, commitTableId, 
commitPartitionId);
+    }
+
+    @Override
+    public @Nullable BinaryRow abortWrite(RowId rowId) throws StorageException 
{
+        return partitionStorage.abortWrite(rowId);
+    }
+
+    @Override
+    public void commitWrite(RowId rowId, HybridTimestamp timestamp) throws 
StorageException {
+        partitionStorage.commitWrite(rowId, timestamp);
+    }
+
+    @Override
+    public MvPartitionStorage getStorage() {

Review Comment:
   This should probably be renamed to `getMvStorage`, there are others



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lock.AutoLockup;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.TxIdMismatchException;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Provides access to both MV (multi-version) data and transactions data of a 
partition.
+ *
+ * <p>Methods writing to MV storage ({@link #addWrite(RowId, BinaryRow, UUID, 
UUID, int)}, {@link #abortWrite(RowId)}
+ * and {@link #commitWrite(RowId, HybridTimestamp)}) and TX data storage MUST 
be invoked under a lock acquired using
+ * {@link #acquirePartitionSnapshotsReadLock()}.
+ *
+ * <p>Each MvPartitionStorage instance represents exactly one partition. All 
RowIds within a partition are sorted consistently with the
+ * {@link RowId#compareTo} comparison order.
+ *
+ * @see org.apache.ignite.internal.storage.MvPartitionStorage
+ * @see org.apache.ignite.internal.tx.storage.state.TxStateStorage
+ */
+public interface PartitionDataStorage extends AutoCloseable {
+    /**
+     * Executes {@link WriteClosure} atomically, meaning that partial result 
of an incomplete closure will never be written to the
+     * physical device, thus guaranteeing data consistency after restart. 
Simply runs the closure in case of a volatile storage.
+     *
+     * @param closure Data access closure to be executed.
+     * @param <V> Type of the result returned from the closure.
+     * @return Closure result.
+     * @throws StorageException If failed to write data to the storage.
+     */
+    <V> V runConsistently(WriteClosure<V> closure) throws StorageException;
+
+    /**
+     * Acquires a read lock on partition snapshots.
+     *
+     * @return The acquired lockup. It will be released through {@link 
AutoLockup#close()} invocation.
+     */
+    AutoLockup acquirePartitionSnapshotsReadLock();
+
+    /**
+     * Flushes current state of the data or <i>the state from the nearest 
future</i> to the storage. It means that the future can be
+     * completed when the underlying storage {@link 
org.apache.ignite.internal.storage.MvPartitionStorage#persistedIndex()} is 
higher
+     * than {@link #lastAppliedIndex()} at the moment of the method's call. 
This feature
+     * allows implementing a batch flush for several partitions at once.
+     *
+     * @return Future that's completed when flushing of the data is completed.
+     */
+    CompletableFuture<Void> flush();
+
+    /**
+     * Index of the highest write command applied to the storage. {@code 0} if 
index is unknown.
+     */
+    long lastAppliedIndex();
+
+    /**
+     * Sets the last applied index value.
+     */
+    void lastAppliedIndex(long lastAppliedIndex) throws StorageException;
+
+    /**
+     * Creates (or replaces) an uncommitted (aka pending) version, assigned to 
the given transaction id.
+     * In details:
+     * - if there is no uncommitted version, a new uncommitted version is added
+     * - if there is an uncommitted version belonging to the same transaction, 
it gets replaced by the given version
+     * - if there is an uncommitted version belonging to a different 
transaction, {@link TxIdMismatchException} is thrown
+     *
+     * <p>This must be called under a lock acquired using {@link 
#acquirePartitionSnapshotsReadLock()}.
+     *
+     * @param rowId Row id.
+     * @param row Binary row to update. Key only row means value removal.
+     * @param txId Transaction id.
+     * @param commitTableId Commit table id.
+     * @param commitPartitionId Commit partitionId.
+     * @return Previous uncommitted row version associated with the row id, or 
{@code null} if no uncommitted version
+     *     exists before this call
+     * @throws TxIdMismatchException If there's another pending update 
associated with different transaction id.
+     * @throws StorageException If failed to write data to the storage.
+     */
+    @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID 
txId, UUID commitTableId, int commitPartitionId)
+            throws TxIdMismatchException, StorageException;
+
+    /**
+     * Aborts a pending update of the ongoing uncommitted transaction. Invoked 
during rollback.
+     *
+     * <p>This must be called under a lock acquired using {@link 
#acquirePartitionSnapshotsReadLock()}.
+     *
+     * @param rowId Row id.
+     * @return Previous uncommitted row version associated with the row id.
+     * @throws StorageException If failed to write data to the storage.
+     */
+    @Nullable BinaryRow abortWrite(RowId rowId) throws StorageException;
+
+    /**
+     * Commits a pending update of the ongoing transaction. Invoked during 
commit. Committed value will be versioned by the given timestamp.
+     *
+     * <p>This must be called under a lock acquired using {@link 
#acquirePartitionSnapshotsReadLock()}.
+     *
+     * @param rowId Row id.
+     * @param timestamp Timestamp to associate with committed value.
+     * @throws StorageException If failed to write data to the storage.
+     */
+    void commitWrite(RowId rowId, HybridTimestamp timestamp) throws 
StorageException;
+
+    /**
+     * Returns the underlying {@link MvPartitionStorage}. Only for tests!
+     *
+     * @return Underlying {@link MvPartitionStorage}.
+     */
+    @TestOnly

Review Comment:
   What's the problem with it being in the signature? Maybe we should allow it 
in non-tests usages



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/MvStoragePartitionAccess.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link PartitionAccess} that adapts an {@link MvPartitionStorage}.
+ */
+public class MvStoragePartitionAccess implements PartitionAccess {
+    private final PartitionKey partitionKey;
+    private final MvPartitionStorage partitionStorage;
+
+    public MvStoragePartitionAccess(PartitionKey partitionKey, 
MvPartitionStorage partitionStorage) {
+        this.partitionKey = partitionKey;
+        this.partitionStorage = partitionStorage;
+    }
+
+    @Override
+    public PartitionKey key() {
+        return partitionKey;
+    }
+
+    @Override
+    public long persistedIndex() {
+        return partitionStorage.persistedIndex();
+    }
+
+    @Override
+    public RowId minRowId() {
+        return RowId.lowestRowId(partitionKey.partitionId());
+    }
+
+    @Override
+    public @Nullable RowId closestRowId(RowId lowerBound) {
+        return partitionStorage.closestRowId(lowerBound);
+    }
+
+    @Override
+    public List<ReadResult> rowVersions(RowId rowId) {
+        try (Cursor<ReadResult> cursor = partitionStorage.scanVersions(rowId)) 
{
+            List<ReadResult> versions = new ArrayList<>();
+
+            for (ReadResult version : cursor) {
+                versions.add(version);
+            }
+
+            return List.copyOf(versions);
+        } catch (Exception e) {
+            // TODO: IGNITE-17935 - handle this?

Review Comment:
   Yes, please :)



##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/RowId.java:
##########
@@ -84,6 +85,16 @@ public long leastSignificantBits() {
         return uuid.getLeastSignificantBits();
     }
 
+    /**
+     * Returns the UUID equivalent of {@link #mostSignificantBits()} and 
{@link #leastSignificantBits()}.
+     *
+     * @return UUID.
+     */
+    @TestOnly
+    public UUID uuid() {

Review Comment:
   We can make this a part of a "legal" API here. Might be useful in other 
conditions.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java:
##########
@@ -17,36 +17,269 @@
 
 package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import 
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaRequest;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaResponse;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataRequest;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
 
 /**
- * Outgoing snapshot.
+ * Outgoing snapshot. It corresponds to exactly one partition.
+ *
+ * <p>The snapshot has a lock needed for interaction with {@link 
SnapshotAwarePartitionDataStorage}.
  */
 public class OutgoingSnapshot {
+    private static final TableMessagesFactory messagesFactory = new 
TableMessagesFactory();
+
+    private final UUID id;
+
+    private final PartitionAccess partition;
+
+    private final OutgoingSnapshotRegistry outgoingSnapshotRegistry;
+
+    /**
+     * Lock that is used for mutual exclusion of snapshot reading (by this 
class) and threads that write to the same
+     * partition (currently, via {@link SnapshotAwarePartitionDataStorage}).
+     */
+    private final Lock rowOperationsLock = new ReentrantLock();
+
+    /**
+     * {@link RowId}s for which the corresponding rows were sent out of order 
(relative to the order in which this
+     * snapshot sends rows), hence they must be skipped when sending rows 
normally.
+     */
+    private final Set<RowId> overwrittenRowIds = new ConcurrentHashSet<>();
+
+    // TODO: IGNITE-17935 - manage queue size
+    /**
+     * Rows that need to be sent out of order (relative to the order in which 
this snapshot sends rows).
+     * Versions inside rows are in oldest-to-newest order.
+     */
+    private final Queue<SnapshotMvDataResponse.ResponseEntry> outOfOrderMvData 
= new LinkedList<>();
+
+    /**
+     * {@link RowId} used to point (most of the time) to the last processed 
row. More precisely:
+     *
+     * <ul>
+     *     <li>Before we started to read from the partition, this is equal to 
lowest theoretically possible
+     *     {@link RowId} for this partition</li>
+     *     <li>If we started to read from partition AND it is not yet 
exhausted, this is the last RowId that was
+     *     sent in this snapshot order</li>
+     *     <li>After we exhausted the partition, this is {@code null}</li>
+     * </ul>
+     */
+    private RowId lastRowId;
+
+    private boolean startedToReadPartition = false;
+
+    /**
+     * This becomes {@code true} as soon as we exhaust both the partition and 
out-of-order queue.
+     */
+    private boolean finished = false;
+
+    /**
+     * Creates a new instance.
+     */
+    public OutgoingSnapshot(UUID id, PartitionAccess partition, 
OutgoingSnapshotRegistry outgoingSnapshotRegistry) {
+        this.id = id;
+        this.partition = partition;
+        this.outgoingSnapshotRegistry = outgoingSnapshotRegistry;
+
+        lastRowId = partition.minRowId();
+    }
+
+    /**
+     * Returns the ID of this snapshot.
+     */
+    public UUID id() {
+        return id;
+    }
+
+    /**
+     * Returns the key of the corresponding partition.
+     *
+     * @return Partition key.
+     */
+    public PartitionKey partitionKey() {
+        return partition.key();
+    }
+
     /**
      * Reads a snapshot meta and returns a future with the response.
      *
      * @param metaRequest Meta request.
      */
     CompletableFuture<SnapshotMetaResponse> 
handleSnapshotMetaRequest(SnapshotMetaRequest metaRequest) {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-17262
+        //TODO https://issues.apache.org/jira/browse/IGNITE-17935
         return null;
     }
 
     /**
      * Reads chunk of partition data and returns a future with the response.
      *
-     * @param mvDataRequest Data request.
+     * @param request Data request.
      */
-    CompletableFuture<SnapshotMvDataResponse> 
handleSnapshotMvDataRequest(SnapshotMvDataRequest mvDataRequest) {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-17262
-        return null;
+    CompletableFuture<SnapshotMvDataResponse> 
handleSnapshotMvDataRequest(SnapshotMvDataRequest request) {
+        // TODO: IGNITE-17935 - executor?
+
+        assert !finished;
+
+        long totalBatchSize = 0;
+        List<SnapshotMvDataResponse.ResponseEntry> batch = new ArrayList<>();
+
+        while (true) {
+            acquireLock();
+
+            try {
+                totalBatchSize = fillWithOutOfOrderRows(batch, totalBatchSize, 
request);
+
+                totalBatchSize = tryProcessRowFromPartition(batch, 
totalBatchSize, request);
+
+                if (exhaustedPartition() && outOfOrderMvData.isEmpty()) {
+                    finished = true;
+                }
+
+                if (finished || batchIsFull(request, totalBatchSize)) {
+                    break;
+                }
+            } finally {
+                releaseLock();
+            }
+        }
+
+        // We unregister itself outside the lock to avoid a deadlock, because 
SnapshotAwareMvPartitionStorage takes
+        // locks in partitionSnapshots.lock -> snapshot.lock; if we did it 
under lock, we would take locks in the
+        // opposite order. That's why we need finished flag and cooperation 
with SnapshotAwareMvPartitionStorage
+        // (calling our isFinished()).
+        if (finished) {
+            outgoingSnapshotRegistry.unregisterOutgoingSnapshot(id);
+        }
+
+        SnapshotMvDataResponse response = 
messagesFactory.snapshotMvDataResponse()
+                .rows(batch)
+                .finish(finished)
+                .build();
+        return CompletableFuture.completedFuture(response);

Review Comment:
   ```suggestion
                   .build();
   
           return CompletableFuture.completedFuture(response);
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java:
##########
@@ -17,36 +17,269 @@
 
 package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import 
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaRequest;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaResponse;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataRequest;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
 
 /**
- * Outgoing snapshot.
+ * Outgoing snapshot. It corresponds to exactly one partition.
+ *
+ * <p>The snapshot has a lock needed for interaction with {@link 
SnapshotAwarePartitionDataStorage}.
  */
 public class OutgoingSnapshot {
+    private static final TableMessagesFactory messagesFactory = new 
TableMessagesFactory();
+
+    private final UUID id;
+
+    private final PartitionAccess partition;
+
+    private final OutgoingSnapshotRegistry outgoingSnapshotRegistry;
+
+    /**
+     * Lock that is used for mutual exclusion of snapshot reading (by this 
class) and threads that write to the same
+     * partition (currently, via {@link SnapshotAwarePartitionDataStorage}).
+     */
+    private final Lock rowOperationsLock = new ReentrantLock();
+
+    /**
+     * {@link RowId}s for which the corresponding rows were sent out of order 
(relative to the order in which this
+     * snapshot sends rows), hence they must be skipped when sending rows 
normally.
+     */
+    private final Set<RowId> overwrittenRowIds = new ConcurrentHashSet<>();
+
+    // TODO: IGNITE-17935 - manage queue size
+    /**
+     * Rows that need to be sent out of order (relative to the order in which 
this snapshot sends rows).
+     * Versions inside rows are in oldest-to-newest order.
+     */
+    private final Queue<SnapshotMvDataResponse.ResponseEntry> outOfOrderMvData 
= new LinkedList<>();
+
+    /**
+     * {@link RowId} used to point (most of the time) to the last processed 
row. More precisely:
+     *
+     * <ul>
+     *     <li>Before we started to read from the partition, this is equal to 
lowest theoretically possible
+     *     {@link RowId} for this partition</li>
+     *     <li>If we started to read from partition AND it is not yet 
exhausted, this is the last RowId that was
+     *     sent in this snapshot order</li>
+     *     <li>After we exhausted the partition, this is {@code null}</li>
+     * </ul>
+     */
+    private RowId lastRowId;
+
+    private boolean startedToReadPartition = false;
+
+    /**
+     * This becomes {@code true} as soon as we exhaust both the partition and 
out-of-order queue.
+     */
+    private boolean finished = false;
+
+    /**
+     * Creates a new instance.
+     */
+    public OutgoingSnapshot(UUID id, PartitionAccess partition, 
OutgoingSnapshotRegistry outgoingSnapshotRegistry) {
+        this.id = id;
+        this.partition = partition;
+        this.outgoingSnapshotRegistry = outgoingSnapshotRegistry;
+
+        lastRowId = partition.minRowId();
+    }
+
+    /**
+     * Returns the ID of this snapshot.
+     */
+    public UUID id() {
+        return id;
+    }
+
+    /**
+     * Returns the key of the corresponding partition.
+     *
+     * @return Partition key.
+     */
+    public PartitionKey partitionKey() {
+        return partition.key();
+    }
+
     /**
      * Reads a snapshot meta and returns a future with the response.
      *
      * @param metaRequest Meta request.
      */
     CompletableFuture<SnapshotMetaResponse> 
handleSnapshotMetaRequest(SnapshotMetaRequest metaRequest) {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-17262
+        //TODO https://issues.apache.org/jira/browse/IGNITE-17935
         return null;
     }
 
     /**
      * Reads chunk of partition data and returns a future with the response.
      *
-     * @param mvDataRequest Data request.
+     * @param request Data request.
      */
-    CompletableFuture<SnapshotMvDataResponse> 
handleSnapshotMvDataRequest(SnapshotMvDataRequest mvDataRequest) {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-17262
-        return null;
+    CompletableFuture<SnapshotMvDataResponse> 
handleSnapshotMvDataRequest(SnapshotMvDataRequest request) {
+        // TODO: IGNITE-17935 - executor?
+
+        assert !finished;
+
+        long totalBatchSize = 0;
+        List<SnapshotMvDataResponse.ResponseEntry> batch = new ArrayList<>();
+
+        while (true) {
+            acquireLock();
+
+            try {
+                totalBatchSize = fillWithOutOfOrderRows(batch, totalBatchSize, 
request);
+
+                totalBatchSize = tryProcessRowFromPartition(batch, 
totalBatchSize, request);
+
+                if (exhaustedPartition() && outOfOrderMvData.isEmpty()) {
+                    finished = true;
+                }
+
+                if (finished || batchIsFull(request, totalBatchSize)) {
+                    break;
+                }
+            } finally {
+                releaseLock();
+            }
+        }
+
+        // We unregister itself outside the lock to avoid a deadlock, because 
SnapshotAwareMvPartitionStorage takes
+        // locks in partitionSnapshots.lock -> snapshot.lock; if we did it 
under lock, we would take locks in the
+        // opposite order. That's why we need finished flag and cooperation 
with SnapshotAwareMvPartitionStorage
+        // (calling our isFinished()).
+        if (finished) {
+            outgoingSnapshotRegistry.unregisterOutgoingSnapshot(id);
+        }
+
+        SnapshotMvDataResponse response = 
messagesFactory.snapshotMvDataResponse()
+                .rows(batch)
+                .finish(finished)
+                .build();
+        return CompletableFuture.completedFuture(response);
+    }
+
+    private long fillWithOutOfOrderRows(
+            List<SnapshotMvDataResponse.ResponseEntry> rowEntries,
+            long totalBytesBefore,
+            SnapshotMvDataRequest request
+    ) {
+        long totalBytesAfter = totalBytesBefore;
+
+        while (totalBytesAfter < request.batchSizeHint()) {
+            SnapshotMvDataResponse.ResponseEntry rowEntry = 
outOfOrderMvData.poll();
+
+            if (rowEntry == null) {
+                break;
+            }
+
+            rowEntries.add(rowEntry);
+            totalBytesAfter += rowSizeInBytes(rowEntry.rowVersions());
+        }
+
+        return totalBytesAfter;
+    }
+
+    private long rowSizeInBytes(List<ByteBuffer> rowVersions) {
+        long sum = 0;
+
+        for (ByteBuffer buf : rowVersions) {
+            if (buf != null) {
+                sum += buf.limit();
+            }
+        }
+
+        return sum;
+    }
+
+    private long 
tryProcessRowFromPartition(List<SnapshotMvDataResponse.ResponseEntry> batch, 
long totalBatchSize,
+            SnapshotMvDataRequest request) {
+        if (batchIsFull(request, totalBatchSize) || exhaustedPartition()) {
+            return totalBatchSize;
+        }
+
+        if (!startedToReadPartition) {
+            lastRowId = partition.closestRowId(lastRowId);
+
+            startedToReadPartition = true;
+        } else {
+            lastRowId = partition.closestRowId(lastRowId.increment());
+        }
+
+        if (!exhaustedPartition()) {
+            if (!overwrittenRowIds.remove(lastRowId)) {
+                List<ReadResult> rowVersions = new 
ArrayList<>(partition.rowVersions(lastRowId));
+                Collections.reverse(rowVersions);
+                SnapshotMvDataResponse.ResponseEntry rowEntry = 
rowEntry(lastRowId, rowVersions);
+
+                batch.add(rowEntry);
+
+                totalBatchSize += rowSizeInBytes(rowEntry.rowVersions());
+            }
+        }
+
+        return totalBatchSize;
+    }
+
+    private boolean batchIsFull(SnapshotMvDataRequest request, long 
totalBatchSize) {
+        return totalBatchSize >= request.batchSizeHint();
+    }
+
+    private boolean exhaustedPartition() {
+        return lastRowId == null;
+    }
+
+    private SnapshotMvDataResponse.ResponseEntry rowEntry(RowId rowId, 
List<ReadResult> rowVersions) {
+        List<ByteBuffer> buffers = new ArrayList<>();

Review Comment:
   Can we pass a size? Or that's a preliminary optimization? I suspect that 
usually these chains would be pretty short. Way shorter than the default 10



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java:
##########
@@ -112,6 +153,7 @@ private void messageHandler(NetworkMessage networkMessage, 
NetworkAddress sender
         CompletableFuture<? extends NetworkMessage> responseFuture = 
handleSnapshotRequestMessage(networkMessage, outgoingSnapshot);
 
         if (responseFuture != null) {
+            //TODO: IGNITE-17935 - whenComplete()? handle()? Should we analyze 
the first exception at all?

Review Comment:
   I guess we should analyze the exception. If something went wrong, we should 
act. Don't know how though



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java:
##########
@@ -17,36 +17,269 @@
 
 package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import 
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaRequest;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaResponse;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataRequest;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
 
 /**
- * Outgoing snapshot.
+ * Outgoing snapshot. It corresponds to exactly one partition.
+ *
+ * <p>The snapshot has a lock needed for interaction with {@link 
SnapshotAwarePartitionDataStorage}.
  */
 public class OutgoingSnapshot {
+    private static final TableMessagesFactory messagesFactory = new 
TableMessagesFactory();
+
+    private final UUID id;
+
+    private final PartitionAccess partition;
+
+    private final OutgoingSnapshotRegistry outgoingSnapshotRegistry;
+
+    /**
+     * Lock that is used for mutual exclusion of snapshot reading (by this 
class) and threads that write to the same
+     * partition (currently, via {@link SnapshotAwarePartitionDataStorage}).
+     */
+    private final Lock rowOperationsLock = new ReentrantLock();
+
+    /**
+     * {@link RowId}s for which the corresponding rows were sent out of order 
(relative to the order in which this
+     * snapshot sends rows), hence they must be skipped when sending rows 
normally.
+     */
+    private final Set<RowId> overwrittenRowIds = new ConcurrentHashSet<>();
+
+    // TODO: IGNITE-17935 - manage queue size
+    /**
+     * Rows that need to be sent out of order (relative to the order in which 
this snapshot sends rows).
+     * Versions inside rows are in oldest-to-newest order.
+     */
+    private final Queue<SnapshotMvDataResponse.ResponseEntry> outOfOrderMvData 
= new LinkedList<>();
+
+    /**
+     * {@link RowId} used to point (most of the time) to the last processed 
row. More precisely:
+     *
+     * <ul>
+     *     <li>Before we started to read from the partition, this is equal to 
lowest theoretically possible
+     *     {@link RowId} for this partition</li>
+     *     <li>If we started to read from partition AND it is not yet 
exhausted, this is the last RowId that was
+     *     sent in this snapshot order</li>
+     *     <li>After we exhausted the partition, this is {@code null}</li>
+     * </ul>
+     */
+    private RowId lastRowId;
+
+    private boolean startedToReadPartition = false;
+
+    /**
+     * This becomes {@code true} as soon as we exhaust both the partition and 
out-of-order queue.
+     */
+    private boolean finished = false;
+
+    /**
+     * Creates a new instance.
+     */
+    public OutgoingSnapshot(UUID id, PartitionAccess partition, 
OutgoingSnapshotRegistry outgoingSnapshotRegistry) {
+        this.id = id;
+        this.partition = partition;
+        this.outgoingSnapshotRegistry = outgoingSnapshotRegistry;
+
+        lastRowId = partition.minRowId();
+    }
+
+    /**
+     * Returns the ID of this snapshot.
+     */
+    public UUID id() {
+        return id;
+    }
+
+    /**
+     * Returns the key of the corresponding partition.
+     *
+     * @return Partition key.
+     */
+    public PartitionKey partitionKey() {
+        return partition.key();
+    }
+
     /**
      * Reads a snapshot meta and returns a future with the response.
      *
      * @param metaRequest Meta request.
      */
     CompletableFuture<SnapshotMetaResponse> 
handleSnapshotMetaRequest(SnapshotMetaRequest metaRequest) {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-17262
+        //TODO https://issues.apache.org/jira/browse/IGNITE-17935
         return null;
     }
 
     /**
      * Reads chunk of partition data and returns a future with the response.
      *
-     * @param mvDataRequest Data request.
+     * @param request Data request.
      */
-    CompletableFuture<SnapshotMvDataResponse> 
handleSnapshotMvDataRequest(SnapshotMvDataRequest mvDataRequest) {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-17262
-        return null;
+    CompletableFuture<SnapshotMvDataResponse> 
handleSnapshotMvDataRequest(SnapshotMvDataRequest request) {
+        // TODO: IGNITE-17935 - executor?
+
+        assert !finished;
+
+        long totalBatchSize = 0;
+        List<SnapshotMvDataResponse.ResponseEntry> batch = new ArrayList<>();
+
+        while (true) {
+            acquireLock();
+
+            try {
+                totalBatchSize = fillWithOutOfOrderRows(batch, totalBatchSize, 
request);
+
+                totalBatchSize = tryProcessRowFromPartition(batch, 
totalBatchSize, request);
+
+                if (exhaustedPartition() && outOfOrderMvData.isEmpty()) {
+                    finished = true;
+                }
+
+                if (finished || batchIsFull(request, totalBatchSize)) {
+                    break;
+                }
+            } finally {
+                releaseLock();
+            }
+        }
+
+        // We unregister itself outside the lock to avoid a deadlock, because 
SnapshotAwareMvPartitionStorage takes
+        // locks in partitionSnapshots.lock -> snapshot.lock; if we did it 
under lock, we would take locks in the
+        // opposite order. That's why we need finished flag and cooperation 
with SnapshotAwareMvPartitionStorage
+        // (calling our isFinished()).
+        if (finished) {
+            outgoingSnapshotRegistry.unregisterOutgoingSnapshot(id);
+        }
+
+        SnapshotMvDataResponse response = 
messagesFactory.snapshotMvDataResponse()
+                .rows(batch)
+                .finish(finished)
+                .build();
+        return CompletableFuture.completedFuture(response);
+    }
+
+    private long fillWithOutOfOrderRows(
+            List<SnapshotMvDataResponse.ResponseEntry> rowEntries,
+            long totalBytesBefore,
+            SnapshotMvDataRequest request
+    ) {
+        long totalBytesAfter = totalBytesBefore;
+
+        while (totalBytesAfter < request.batchSizeHint()) {
+            SnapshotMvDataResponse.ResponseEntry rowEntry = 
outOfOrderMvData.poll();
+
+            if (rowEntry == null) {
+                break;
+            }
+
+            rowEntries.add(rowEntry);
+            totalBytesAfter += rowSizeInBytes(rowEntry.rowVersions());
+        }
+
+        return totalBytesAfter;
+    }
+
+    private long rowSizeInBytes(List<ByteBuffer> rowVersions) {
+        long sum = 0;
+
+        for (ByteBuffer buf : rowVersions) {
+            if (buf != null) {
+                sum += buf.limit();
+            }
+        }
+
+        return sum;
+    }
+
+    private long 
tryProcessRowFromPartition(List<SnapshotMvDataResponse.ResponseEntry> batch, 
long totalBatchSize,
+            SnapshotMvDataRequest request) {
+        if (batchIsFull(request, totalBatchSize) || exhaustedPartition()) {
+            return totalBatchSize;
+        }
+
+        if (!startedToReadPartition) {
+            lastRowId = partition.closestRowId(lastRowId);
+
+            startedToReadPartition = true;
+        } else {
+            lastRowId = partition.closestRowId(lastRowId.increment());
+        }
+
+        if (!exhaustedPartition()) {
+            if (!overwrittenRowIds.remove(lastRowId)) {
+                List<ReadResult> rowVersions = new 
ArrayList<>(partition.rowVersions(lastRowId));
+                Collections.reverse(rowVersions);
+                SnapshotMvDataResponse.ResponseEntry rowEntry = 
rowEntry(lastRowId, rowVersions);
+
+                batch.add(rowEntry);
+
+                totalBatchSize += rowSizeInBytes(rowEntry.rowVersions());
+            }
+        }
+
+        return totalBatchSize;
+    }
+
+    private boolean batchIsFull(SnapshotMvDataRequest request, long 
totalBatchSize) {
+        return totalBatchSize >= request.batchSizeHint();
+    }
+
+    private boolean exhaustedPartition() {
+        return lastRowId == null;
+    }
+
+    private SnapshotMvDataResponse.ResponseEntry rowEntry(RowId rowId, 
List<ReadResult> rowVersions) {
+        List<ByteBuffer> buffers = new ArrayList<>();
+        List<HybridTimestamp> commitTimestamps = new ArrayList<>();
+        UUID transactionId = null;
+        UUID commitTableId = null;
+        int commitPartitionId = ReadResult.UNDEFINED_COMMIT_PARTITION_ID;
+
+        for (ReadResult version : rowVersions) {
+            BinaryRow row = version.binaryRow();
+
+            buffers.add(row == null ? null : row.byteBuffer());
+
+            if (version.isWriteIntent()) {
+                transactionId = version.transactionId();
+                commitTableId = version.commitTableId();
+                commitPartitionId = version.commitPartitionId();
+            } else {
+                commitTimestamps.add(version.commitTimestamp());
+            }
+        }
+
+        return messagesFactory.responseEntry()
+                .rowId(new UUID(rowId.mostSignificantBits(), 
rowId.leastSignificantBits()))

Review Comment:
   This is what I meant. `uuid()` method may be useful



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java:
##########
@@ -55,7 +288,76 @@ CompletableFuture<SnapshotMvDataResponse> 
handleSnapshotMvDataRequest(SnapshotMv
      * @param txDataRequest Data request.
      */
     CompletableFuture<SnapshotTxDataResponse> 
handleSnapshotTxDataRequest(SnapshotTxDataRequest txDataRequest) {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-17262
+        //TODO https://issues.apache.org/jira/browse/IGNITE-17935
         return null;
     }
+
+    /**
+     * Acquires this snapshot lock.
+     */
+    public void acquireLock() {
+        rowOperationsLock.lock();
+    }
+
+    /**
+     * Releases this snapshot lock.
+     */
+    public void releaseLock() {
+        rowOperationsLock.unlock();
+    }
+
+    /**
+     * Whether this snapshot is finished (i.e. it already sent all the MV data 
and is not going to send anything else).
+     *
+     * <p>Must be called under snapshot lock.
+     *
+     * @return {@code true} if finished.
+     */
+    public boolean isFinished() {
+        return finished;
+    }
+
+    /**
+     * Adds a {@link RowId} to the collection of IDs that need to be skipped 
during normal snapshot row sending.
+     *
+     * <p>Must be called under snapshot lock.
+     *
+     * @param rowId RowId to add.
+     * @return {@code true} if the given RowId was added as it was not yet in 
the collection of IDs to skip.
+     */
+    public boolean addOverwrittenRowId(RowId rowId) {
+        return overwrittenRowIds.add(rowId);
+    }
+
+    /**
+     * Returns {@code true} if the given {@link RowId} does not interfere with 
the rows that this snapshot is going
+     * to be sent in the normal snapshot rows sending order.
+     *
+     * <p>Must be called under snapshot lock.
+     *
+     * @param rowId RowId.
+     * @return {@code true} if the given RowId is already passed by the 
snapshot in normal rows sending order.
+     */
+    public boolean alreadyPassed(RowId rowId) {
+        if (!startedToReadPartition) {
+            return false;
+        }
+        if (exhaustedPartition()) {
+            return true;
+        }
+
+        return rowId.compareTo(lastRowId) <= 0;
+    }
+
+    /**
+     * Enqueues a row for out-of-order sending.
+     *
+     * <p>Must be called under snapshot lock.
+     *
+     * @param rowId {@link RowId} of the row.
+     * @param rowVersions Versions of the row (oldest to newest).
+     */
+    public void enqueueForSending(RowId rowId, List<ReadResult> rowVersions) {

Review Comment:
   I see inconsistency here. Regular sends read data in this class, but RAFT 
commands must themselves read rows before appending them to the queue. This is 
a little bizarre, can we just pass RowId here and do all the magic inside?



##########
modules/table/src/integrationTest/java/org/apache/ignite/distributed/TestPartitionDataStorage.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.distributed;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lock.AutoLockup;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.TxIdMismatchException;
+import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Test implementation of {@link PartitionDataStorage}.
+ */
+public class TestPartitionDataStorage implements PartitionDataStorage {
+    private final MvPartitionStorage partitionStorage;
+
+    private final Lock partitionSnapshotsLock = new ReentrantLock();
+
+    public TestPartitionDataStorage(MvPartitionStorage partitionStorage) {
+        this.partitionStorage = partitionStorage;
+    }
+
+    @Override
+    public <V> V runConsistently(WriteClosure<V> closure) throws 
StorageException {
+        return partitionStorage.runConsistently(closure);
+    }
+
+    @SuppressWarnings("Convert2Lambda")
+    @Override
+    public AutoLockup acquirePartitionSnapshotsReadLock() {
+        partitionSnapshotsLock.lock();
+
+        return new AutoLockup() {
+            @Override
+            public void close() {
+                partitionSnapshotsLock.unlock();
+            }
+        };
+    }
+
+    @Override
+    public CompletableFuture<Void> flush() {
+        return partitionStorage.flush();
+    }
+
+    @Override
+    public long lastAppliedIndex() {
+        return partitionStorage.lastAppliedIndex();
+    }
+
+    @Override
+    public void lastAppliedIndex(long lastAppliedIndex) throws 
StorageException {
+        partitionStorage.lastAppliedIndex(lastAppliedIndex);
+    }
+
+    @Override
+    public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, 
UUID txId, UUID commitTableId,
+            int commitPartitionId) throws TxIdMismatchException, 
StorageException {
+        return partitionStorage.addWrite(rowId, row, txId, commitTableId, 
commitPartitionId);
+    }
+
+    @Override
+    public @Nullable BinaryRow abortWrite(RowId rowId) throws StorageException 
{
+        return partitionStorage.abortWrite(rowId);
+    }
+
+    @Override
+    public void commitWrite(RowId rowId, HybridTimestamp timestamp) throws 
StorageException {
+        partitionStorage.commitWrite(rowId, timestamp);
+    }
+
+    @Override
+    public MvPartitionStorage getStorage() {
+        return partitionStorage;
+    }
+
+    @Override
+    public void close() throws Exception {

Review Comment:
   I don't know what you're going to do with this information honestly



##########
modules/table/src/integrationTest/java/org/apache/ignite/distributed/TestPartitionDataStorage.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.distributed;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lock.AutoLockup;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.TxIdMismatchException;
+import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Test implementation of {@link PartitionDataStorage}.
+ */
+public class TestPartitionDataStorage implements PartitionDataStorage {
+    private final MvPartitionStorage partitionStorage;
+
+    private final Lock partitionSnapshotsLock = new ReentrantLock();
+
+    public TestPartitionDataStorage(MvPartitionStorage partitionStorage) {
+        this.partitionStorage = partitionStorage;
+    }
+
+    @Override
+    public <V> V runConsistently(WriteClosure<V> closure) throws 
StorageException {
+        return partitionStorage.runConsistently(closure);
+    }
+
+    @SuppressWarnings("Convert2Lambda")

Review Comment:
   But why not using a lambda? Looks convenient to me. BTW, I'd advice 
declaring AutoLockup as a `FunctionalInterface`



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/message/SnapshotMvDataResponse.java:
##########
@@ -64,7 +64,8 @@ interface ResponseEntry extends NetworkMessage {
         /** Commit table id for write-intent if it's present. */
         @Nullable UUID commitTableId();
 
-        /** Commit partition id for write-intent if it's present. {@code -1} 
otherwise. */
+        /** Commit partition id for write-intent if it's present.
+         *  {@link 
org.apache.ignite.internal.storage.ReadResult#UNDEFINED_COMMIT_PARTITION_ID} 
otherwise. */

Review Comment:
   Short name would look nicer, can you add this class to imports?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lock.AutoLockup;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.TxIdMismatchException;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Provides access to both MV (multi-version) data and transactions data of a 
partition.
+ *
+ * <p>Methods writing to MV storage ({@link #addWrite(RowId, BinaryRow, UUID, 
UUID, int)}, {@link #abortWrite(RowId)}
+ * and {@link #commitWrite(RowId, HybridTimestamp)}) and TX data storage MUST 
be invoked under a lock acquired using
+ * {@link #acquirePartitionSnapshotsReadLock()}.
+ *
+ * <p>Each MvPartitionStorage instance represents exactly one partition. All 
RowIds within a partition are sorted consistently with the
+ * {@link RowId#compareTo} comparison order.
+ *
+ * @see org.apache.ignite.internal.storage.MvPartitionStorage
+ * @see org.apache.ignite.internal.tx.storage.state.TxStateStorage
+ */
+public interface PartitionDataStorage extends AutoCloseable {
+    /**
+     * Executes {@link WriteClosure} atomically, meaning that partial result 
of an incomplete closure will never be written to the
+     * physical device, thus guaranteeing data consistency after restart. 
Simply runs the closure in case of a volatile storage.
+     *
+     * @param closure Data access closure to be executed.
+     * @param <V> Type of the result returned from the closure.
+     * @return Closure result.
+     * @throws StorageException If failed to write data to the storage.
+     */

Review Comment:
   You should probably add links like `@see 
MvPartitionStorage#runConsistently(WriteClosure)` in all methods



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/MvStoragePartitionAccess.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link PartitionAccess} that adapts an {@link MvPartitionStorage}.
+ */
+public class MvStoragePartitionAccess implements PartitionAccess {
+    private final PartitionKey partitionKey;
+    private final MvPartitionStorage partitionStorage;
+
+    public MvStoragePartitionAccess(PartitionKey partitionKey, 
MvPartitionStorage partitionStorage) {
+        this.partitionKey = partitionKey;
+        this.partitionStorage = partitionStorage;
+    }
+
+    @Override
+    public PartitionKey key() {
+        return partitionKey;
+    }
+
+    @Override
+    public long persistedIndex() {
+        return partitionStorage.persistedIndex();
+    }
+
+    @Override
+    public RowId minRowId() {
+        return RowId.lowestRowId(partitionKey.partitionId());
+    }
+
+    @Override
+    public @Nullable RowId closestRowId(RowId lowerBound) {
+        return partitionStorage.closestRowId(lowerBound);
+    }
+
+    @Override
+    public List<ReadResult> rowVersions(RowId rowId) {
+        try (Cursor<ReadResult> cursor = partitionStorage.scanVersions(rowId)) 
{
+            List<ReadResult> versions = new ArrayList<>();
+
+            for (ReadResult version : cursor) {
+                versions.add(version);
+            }
+
+            return List.copyOf(versions);

Review Comment:
   What's the idea behind a "copyOf" here? It's safe to return mutable 
collection.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/MvStoragePartitionAccess.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link PartitionAccess} that adapts an {@link MvPartitionStorage}.
+ */
+public class MvStoragePartitionAccess implements PartitionAccess {
+    private final PartitionKey partitionKey;
+    private final MvPartitionStorage partitionStorage;
+
+    public MvStoragePartitionAccess(PartitionKey partitionKey, 
MvPartitionStorage partitionStorage) {
+        this.partitionKey = partitionKey;
+        this.partitionStorage = partitionStorage;
+    }
+
+    @Override
+    public PartitionKey key() {
+        return partitionKey;
+    }
+
+    @Override
+    public long persistedIndex() {
+        return partitionStorage.persistedIndex();
+    }
+
+    @Override
+    public RowId minRowId() {

Review Comment:
   I think that this method shouldn't be here, cause it has the same 
implementation everywhere. Just make a `partitionId()` and that's it
   EDIT I see that there's a "partitionKey" that contains partition id



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java:
##########
@@ -17,36 +17,269 @@
 
 package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import 
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaRequest;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaResponse;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataRequest;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
 
 /**
- * Outgoing snapshot.
+ * Outgoing snapshot. It corresponds to exactly one partition.
+ *
+ * <p>The snapshot has a lock needed for interaction with {@link 
SnapshotAwarePartitionDataStorage}.
  */
 public class OutgoingSnapshot {
+    private static final TableMessagesFactory messagesFactory = new 
TableMessagesFactory();

Review Comment:
   So it's `static final` after all :)
   Unorthodox naming, should be upper case



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionKey.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot;
+
+import java.util.Objects;
+import java.util.UUID;
+
+/**
+ * Uniquely identifies a partition. This is a pair of internal table ID and 
partition number (aka partition ID).
+ */
+public class PartitionKey {
+    private final UUID tableId;
+    private final int partitionId;
+
+    /**
+     * Returns partition ID.
+     *
+     * @return Partition ID.
+     */
+    public int partitionId() {
+        return partitionId;
+    }
+
+    /**
+     * Constructs a new partition key.
+     */
+    public PartitionKey(UUID tableId, int partitionId) {
+        Objects.requireNonNull(tableId, "tableId cannot be null");
+
+        this.tableId = tableId;
+        this.partitionId = partitionId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        PartitionKey that = (PartitionKey) o;
+        return partitionId == that.partitionId && tableId.equals(that.tableId);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(tableId, partitionId);
+    }
+
+    @Override
+    public String toString() {
+        return "PartitionKey{"

Review Comment:
   Looks like an auto-generated code from IDEA. We have a different format, you 
can use utility class S or just change the code



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -860,8 +881,8 @@ private RaftGroupOptions groupOptionsForPartition(
         raftGroupOptions.snapshotStorageFactory(new 
PartitionSnapshotStorageFactory(
                 raftMgr.topologyService(),
                 //TODO IGNITE-17302 Use miniumum from mv storage and tx state 
storage.
-                new OutgoingSnapshotsManager(raftMgr.messagingService()),
-                partitionStorage::persistedIndex,
+                outgoingSnapshotsManager,
+                new MvStoragePartitionAccess(partitionKey(internalTbl, 
partId), partitionStorage),

Review Comment:
   Please ask @alievmirza for his PR to validate what he did in this exact 
line, so that you both know what's happening. He showed an interest in what 
you're doing btw



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java:
##########
@@ -55,7 +288,76 @@ CompletableFuture<SnapshotMvDataResponse> 
handleSnapshotMvDataRequest(SnapshotMv
      * @param txDataRequest Data request.
      */
     CompletableFuture<SnapshotTxDataResponse> 
handleSnapshotTxDataRequest(SnapshotTxDataRequest txDataRequest) {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-17262
+        //TODO https://issues.apache.org/jira/browse/IGNITE-17935
         return null;
     }
+
+    /**
+     * Acquires this snapshot lock.
+     */
+    public void acquireLock() {
+        rowOperationsLock.lock();
+    }
+
+    /**
+     * Releases this snapshot lock.
+     */
+    public void releaseLock() {
+        rowOperationsLock.unlock();
+    }
+
+    /**
+     * Whether this snapshot is finished (i.e. it already sent all the MV data 
and is not going to send anything else).
+     *
+     * <p>Must be called under snapshot lock.
+     *
+     * @return {@code true} if finished.
+     */
+    public boolean isFinished() {
+        return finished;
+    }
+
+    /**
+     * Adds a {@link RowId} to the collection of IDs that need to be skipped 
during normal snapshot row sending.
+     *
+     * <p>Must be called under snapshot lock.
+     *
+     * @param rowId RowId to add.
+     * @return {@code true} if the given RowId was added as it was not yet in 
the collection of IDs to skip.
+     */
+    public boolean addOverwrittenRowId(RowId rowId) {

Review Comment:
   I see that you decided to leave the name from the description. It it good 
enough? To be honest' I didn't think too hard before choosing it



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java:
##########
@@ -17,36 +17,269 @@
 
 package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import 
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaRequest;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaResponse;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataRequest;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
 
 /**
- * Outgoing snapshot.
+ * Outgoing snapshot. It corresponds to exactly one partition.
+ *
+ * <p>The snapshot has a lock needed for interaction with {@link 
SnapshotAwarePartitionDataStorage}.
  */
 public class OutgoingSnapshot {
+    private static final TableMessagesFactory messagesFactory = new 
TableMessagesFactory();
+
+    private final UUID id;
+
+    private final PartitionAccess partition;
+
+    private final OutgoingSnapshotRegistry outgoingSnapshotRegistry;
+
+    /**
+     * Lock that is used for mutual exclusion of snapshot reading (by this 
class) and threads that write to the same
+     * partition (currently, via {@link SnapshotAwarePartitionDataStorage}).
+     */
+    private final Lock rowOperationsLock = new ReentrantLock();
+
+    /**
+     * {@link RowId}s for which the corresponding rows were sent out of order 
(relative to the order in which this
+     * snapshot sends rows), hence they must be skipped when sending rows 
normally.
+     */
+    private final Set<RowId> overwrittenRowIds = new ConcurrentHashSet<>();
+
+    // TODO: IGNITE-17935 - manage queue size
+    /**
+     * Rows that need to be sent out of order (relative to the order in which 
this snapshot sends rows).
+     * Versions inside rows are in oldest-to-newest order.
+     */
+    private final Queue<SnapshotMvDataResponse.ResponseEntry> outOfOrderMvData 
= new LinkedList<>();
+
+    /**
+     * {@link RowId} used to point (most of the time) to the last processed 
row. More precisely:
+     *
+     * <ul>
+     *     <li>Before we started to read from the partition, this is equal to 
lowest theoretically possible
+     *     {@link RowId} for this partition</li>
+     *     <li>If we started to read from partition AND it is not yet 
exhausted, this is the last RowId that was
+     *     sent in this snapshot order</li>
+     *     <li>After we exhausted the partition, this is {@code null}</li>
+     * </ul>
+     */
+    private RowId lastRowId;
+
+    private boolean startedToReadPartition = false;
+
+    /**
+     * This becomes {@code true} as soon as we exhaust both the partition and 
out-of-order queue.
+     */
+    private boolean finished = false;
+
+    /**
+     * Creates a new instance.
+     */
+    public OutgoingSnapshot(UUID id, PartitionAccess partition, 
OutgoingSnapshotRegistry outgoingSnapshotRegistry) {
+        this.id = id;
+        this.partition = partition;
+        this.outgoingSnapshotRegistry = outgoingSnapshotRegistry;
+
+        lastRowId = partition.minRowId();
+    }
+
+    /**
+     * Returns the ID of this snapshot.
+     */
+    public UUID id() {
+        return id;
+    }
+
+    /**
+     * Returns the key of the corresponding partition.
+     *
+     * @return Partition key.
+     */
+    public PartitionKey partitionKey() {
+        return partition.key();
+    }
+
     /**
      * Reads a snapshot meta and returns a future with the response.
      *
      * @param metaRequest Meta request.
      */
     CompletableFuture<SnapshotMetaResponse> 
handleSnapshotMetaRequest(SnapshotMetaRequest metaRequest) {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-17262
+        //TODO https://issues.apache.org/jira/browse/IGNITE-17935
         return null;
     }
 
     /**
      * Reads chunk of partition data and returns a future with the response.
      *
-     * @param mvDataRequest Data request.
+     * @param request Data request.
      */
-    CompletableFuture<SnapshotMvDataResponse> 
handleSnapshotMvDataRequest(SnapshotMvDataRequest mvDataRequest) {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-17262
-        return null;
+    CompletableFuture<SnapshotMvDataResponse> 
handleSnapshotMvDataRequest(SnapshotMvDataRequest request) {
+        // TODO: IGNITE-17935 - executor?
+
+        assert !finished;
+
+        long totalBatchSize = 0;
+        List<SnapshotMvDataResponse.ResponseEntry> batch = new ArrayList<>();
+
+        while (true) {
+            acquireLock();
+
+            try {
+                totalBatchSize = fillWithOutOfOrderRows(batch, totalBatchSize, 
request);
+
+                totalBatchSize = tryProcessRowFromPartition(batch, 
totalBatchSize, request);
+
+                if (exhaustedPartition() && outOfOrderMvData.isEmpty()) {
+                    finished = true;
+                }
+
+                if (finished || batchIsFull(request, totalBatchSize)) {
+                    break;
+                }
+            } finally {
+                releaseLock();
+            }
+        }
+
+        // We unregister itself outside the lock to avoid a deadlock, because 
SnapshotAwareMvPartitionStorage takes
+        // locks in partitionSnapshots.lock -> snapshot.lock; if we did it 
under lock, we would take locks in the
+        // opposite order. That's why we need finished flag and cooperation 
with SnapshotAwareMvPartitionStorage
+        // (calling our isFinished()).
+        if (finished) {
+            outgoingSnapshotRegistry.unregisterOutgoingSnapshot(id);
+        }
+
+        SnapshotMvDataResponse response = 
messagesFactory.snapshotMvDataResponse()
+                .rows(batch)
+                .finish(finished)
+                .build();
+        return CompletableFuture.completedFuture(response);
+    }
+
+    private long fillWithOutOfOrderRows(
+            List<SnapshotMvDataResponse.ResponseEntry> rowEntries,
+            long totalBytesBefore,
+            SnapshotMvDataRequest request
+    ) {
+        long totalBytesAfter = totalBytesBefore;
+
+        while (totalBytesAfter < request.batchSizeHint()) {
+            SnapshotMvDataResponse.ResponseEntry rowEntry = 
outOfOrderMvData.poll();
+
+            if (rowEntry == null) {
+                break;
+            }
+
+            rowEntries.add(rowEntry);
+            totalBytesAfter += rowSizeInBytes(rowEntry.rowVersions());
+        }
+
+        return totalBytesAfter;
+    }
+
+    private long rowSizeInBytes(List<ByteBuffer> rowVersions) {
+        long sum = 0;
+
+        for (ByteBuffer buf : rowVersions) {
+            if (buf != null) {
+                sum += buf.limit();

Review Comment:
   Technically, you should subtract a position, but our ruler are very vague 
here. Maybe position must be 0 here, who knows? We'll figure it out one day



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lock.AutoLockup;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.TxIdMismatchException;
+import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * {@link MvPartitionStorage} decorator that adds snapshot awareness. This 
means that writes coordinate with ongoing
+ * snapshots to make sure that the writes do not interfere with the snapshots.
+ */
+public class SnapshotAwarePartitionDataStorage implements PartitionDataStorage 
{

Review Comment:
   Like here. What's the point of having two almost identical implementations? 
This can be a simple class, without dedicated interface that comes with it



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java:
##########
@@ -17,36 +17,269 @@
 
 package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import 
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaRequest;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaResponse;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataRequest;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
 
 /**
- * Outgoing snapshot.
+ * Outgoing snapshot. It corresponds to exactly one partition.
+ *
+ * <p>The snapshot has a lock needed for interaction with {@link 
SnapshotAwarePartitionDataStorage}.
  */
 public class OutgoingSnapshot {
+    private static final TableMessagesFactory messagesFactory = new 
TableMessagesFactory();
+
+    private final UUID id;
+
+    private final PartitionAccess partition;
+
+    private final OutgoingSnapshotRegistry outgoingSnapshotRegistry;
+
+    /**
+     * Lock that is used for mutual exclusion of snapshot reading (by this 
class) and threads that write to the same
+     * partition (currently, via {@link SnapshotAwarePartitionDataStorage}).
+     */
+    private final Lock rowOperationsLock = new ReentrantLock();
+
+    /**
+     * {@link RowId}s for which the corresponding rows were sent out of order 
(relative to the order in which this
+     * snapshot sends rows), hence they must be skipped when sending rows 
normally.
+     */
+    private final Set<RowId> overwrittenRowIds = new ConcurrentHashSet<>();
+
+    // TODO: IGNITE-17935 - manage queue size
+    /**
+     * Rows that need to be sent out of order (relative to the order in which 
this snapshot sends rows).
+     * Versions inside rows are in oldest-to-newest order.
+     */
+    private final Queue<SnapshotMvDataResponse.ResponseEntry> outOfOrderMvData 
= new LinkedList<>();
+
+    /**
+     * {@link RowId} used to point (most of the time) to the last processed 
row. More precisely:
+     *
+     * <ul>
+     *     <li>Before we started to read from the partition, this is equal to 
lowest theoretically possible
+     *     {@link RowId} for this partition</li>
+     *     <li>If we started to read from partition AND it is not yet 
exhausted, this is the last RowId that was
+     *     sent in this snapshot order</li>
+     *     <li>After we exhausted the partition, this is {@code null}</li>
+     * </ul>
+     */
+    private RowId lastRowId;
+
+    private boolean startedToReadPartition = false;
+
+    /**
+     * This becomes {@code true} as soon as we exhaust both the partition and 
out-of-order queue.
+     */
+    private boolean finished = false;
+
+    /**
+     * Creates a new instance.
+     */
+    public OutgoingSnapshot(UUID id, PartitionAccess partition, 
OutgoingSnapshotRegistry outgoingSnapshotRegistry) {
+        this.id = id;
+        this.partition = partition;
+        this.outgoingSnapshotRegistry = outgoingSnapshotRegistry;
+
+        lastRowId = partition.minRowId();
+    }
+
+    /**
+     * Returns the ID of this snapshot.
+     */
+    public UUID id() {
+        return id;
+    }
+
+    /**
+     * Returns the key of the corresponding partition.
+     *
+     * @return Partition key.
+     */
+    public PartitionKey partitionKey() {
+        return partition.key();
+    }
+
     /**
      * Reads a snapshot meta and returns a future with the response.
      *
      * @param metaRequest Meta request.
      */
     CompletableFuture<SnapshotMetaResponse> 
handleSnapshotMetaRequest(SnapshotMetaRequest metaRequest) {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-17262
+        //TODO https://issues.apache.org/jira/browse/IGNITE-17935
         return null;
     }
 
     /**
      * Reads chunk of partition data and returns a future with the response.
      *
-     * @param mvDataRequest Data request.
+     * @param request Data request.
      */
-    CompletableFuture<SnapshotMvDataResponse> 
handleSnapshotMvDataRequest(SnapshotMvDataRequest mvDataRequest) {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-17262
-        return null;
+    CompletableFuture<SnapshotMvDataResponse> 
handleSnapshotMvDataRequest(SnapshotMvDataRequest request) {
+        // TODO: IGNITE-17935 - executor?
+
+        assert !finished;
+
+        long totalBatchSize = 0;
+        List<SnapshotMvDataResponse.ResponseEntry> batch = new ArrayList<>();
+
+        while (true) {
+            acquireLock();
+
+            try {
+                totalBatchSize = fillWithOutOfOrderRows(batch, totalBatchSize, 
request);
+
+                totalBatchSize = tryProcessRowFromPartition(batch, 
totalBatchSize, request);
+
+                if (exhaustedPartition() && outOfOrderMvData.isEmpty()) {
+                    finished = true;
+                }
+
+                if (finished || batchIsFull(request, totalBatchSize)) {
+                    break;
+                }
+            } finally {
+                releaseLock();
+            }
+        }
+
+        // We unregister itself outside the lock to avoid a deadlock, because 
SnapshotAwareMvPartitionStorage takes
+        // locks in partitionSnapshots.lock -> snapshot.lock; if we did it 
under lock, we would take locks in the
+        // opposite order. That's why we need finished flag and cooperation 
with SnapshotAwareMvPartitionStorage
+        // (calling our isFinished()).
+        if (finished) {
+            outgoingSnapshotRegistry.unregisterOutgoingSnapshot(id);
+        }
+
+        SnapshotMvDataResponse response = 
messagesFactory.snapshotMvDataResponse()
+                .rows(batch)
+                .finish(finished)
+                .build();
+        return CompletableFuture.completedFuture(response);
+    }
+
+    private long fillWithOutOfOrderRows(
+            List<SnapshotMvDataResponse.ResponseEntry> rowEntries,
+            long totalBytesBefore,
+            SnapshotMvDataRequest request
+    ) {
+        long totalBytesAfter = totalBytesBefore;
+
+        while (totalBytesAfter < request.batchSizeHint()) {
+            SnapshotMvDataResponse.ResponseEntry rowEntry = 
outOfOrderMvData.poll();
+
+            if (rowEntry == null) {
+                break;
+            }
+
+            rowEntries.add(rowEntry);
+            totalBytesAfter += rowSizeInBytes(rowEntry.rowVersions());
+        }
+
+        return totalBytesAfter;
+    }
+
+    private long rowSizeInBytes(List<ByteBuffer> rowVersions) {
+        long sum = 0;
+
+        for (ByteBuffer buf : rowVersions) {
+            if (buf != null) {
+                sum += buf.limit();
+            }
+        }
+
+        return sum;
+    }
+
+    private long 
tryProcessRowFromPartition(List<SnapshotMvDataResponse.ResponseEntry> batch, 
long totalBatchSize,
+            SnapshotMvDataRequest request) {
+        if (batchIsFull(request, totalBatchSize) || exhaustedPartition()) {
+            return totalBatchSize;
+        }
+
+        if (!startedToReadPartition) {
+            lastRowId = partition.closestRowId(lastRowId);
+
+            startedToReadPartition = true;
+        } else {
+            lastRowId = partition.closestRowId(lastRowId.increment());
+        }
+
+        if (!exhaustedPartition()) {
+            if (!overwrittenRowIds.remove(lastRowId)) {
+                List<ReadResult> rowVersions = new 
ArrayList<>(partition.rowVersions(lastRowId));
+                Collections.reverse(rowVersions);
+                SnapshotMvDataResponse.ResponseEntry rowEntry = 
rowEntry(lastRowId, rowVersions);
+
+                batch.add(rowEntry);
+
+                totalBatchSize += rowSizeInBytes(rowEntry.rowVersions());
+            }
+        }
+
+        return totalBatchSize;
+    }
+
+    private boolean batchIsFull(SnapshotMvDataRequest request, long 
totalBatchSize) {
+        return totalBatchSize >= request.batchSizeHint();
+    }
+
+    private boolean exhaustedPartition() {
+        return lastRowId == null;
+    }
+
+    private SnapshotMvDataResponse.ResponseEntry rowEntry(RowId rowId, 
List<ReadResult> rowVersions) {
+        List<ByteBuffer> buffers = new ArrayList<>();
+        List<HybridTimestamp> commitTimestamps = new ArrayList<>();
+        UUID transactionId = null;
+        UUID commitTableId = null;
+        int commitPartitionId = ReadResult.UNDEFINED_COMMIT_PARTITION_ID;
+
+        for (ReadResult version : rowVersions) {

Review Comment:
   Ok, I have a complaint. You instantiate a new collection, reverse it and 
then only use it for iteration. Why do we waste objects? Can we just iterate it 
backwards? It's fine to assert that list is `RandomAccess` if we need to be 
100% sure.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/PartitionsSnapshots.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing;
+
+import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
+
+/**
+ * Allows to obtain {@link PartitionSnapshots} instances.
+ */
+public interface PartitionsSnapshots {

Review Comment:
   Don't you have a feeling that there are too many interfaces?



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTest.java:
##########
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.hlc.HybridClock;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import 
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
+import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataRequest;
+import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class OutgoingSnapshotTest {
+    @Mock
+    private PartitionAccess partitionAccess;
+
+    @Mock
+    private OutgoingSnapshotRegistry snapshotRegistry;
+
+    private OutgoingSnapshot snapshot;
+
+    private final TableMessagesFactory messagesFactory = new 
TableMessagesFactory();
+
+    private final RowId lowestRowId = RowId.lowestRowId(1);
+    private final RowId rowId1 = 
Objects.requireNonNull(lowestRowId.increment());
+    private final RowId rowId2 = Objects.requireNonNull(rowId1.increment());
+    private final RowId rowId3 = Objects.requireNonNull(rowId2.increment());
+
+    private final RowId rowIdOutOfOrder;
+
+    {

Review Comment:
   Why don't you use setUp method?
   Oh wait, you already have `@BeforeEach`, why did you split it in two 
sections then?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java:
##########
@@ -141,8 +183,50 @@ private CompletableFuture<Void> respond(
             NetworkAddress sender,
             Long correlationId
     ) {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-17262
+        //TODO https://issues.apache.org/jira/browse/IGNITE-17935
         // Handle offline sender and stopped manager.
         return messagingService.respond(sender, response, correlationId);
     }
+
+    @Override
+    public PartitionSnapshots partitionSnapshots(PartitionKey partitionKey) {
+        return getPartitionSnapshots(partitionKey);
+    }
+
+    private static class PartitionSnapshotsImpl implements PartitionSnapshots {
+        private final List<OutgoingSnapshot> snapshots = new ArrayList<>();
+
+        private final List<OutgoingSnapshot> unmodifiableSnapshotsView = 
unmodifiableList(snapshots);
+
+        private final ReadWriteLock lock = new ReentrantReadWriteLock();
+        private final ReusableLockLockup readLockLockup = new 
ReusableLockLockup(lock.readLock());
+
+        private void addUnderLock(OutgoingSnapshot snapshot) {
+            lock.writeLock().lock();
+
+            try {
+                snapshots.add(snapshot);
+            } finally {
+                lock.writeLock().unlock();
+            }
+        }
+
+        private void removeUnderLock(OutgoingSnapshot snapshot) {
+            try {
+                snapshots.remove(snapshot);
+            } finally {
+                lock.writeLock().unlock();
+            }
+        }
+
+        @Override
+        public AutoLockup acquireReadLock() {
+            return readLockLockup.acquireLock();
+        }
+
+        @Override
+        public List<OutgoingSnapshot> ongoingSnapshots() {
+            return unmodifiableSnapshotsView;
+        }

Review Comment:
   What's the point of extra field? Are we this paranoid?
   This is extra code that exists for no good reason, in my opinion. Can be 
safely replaced with `List<? extends OutgoingSnapshot>`, has the same effect if 
its caller doesn't use raw types



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java:
##########
@@ -17,36 +17,269 @@
 
 package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import 
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaRequest;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaResponse;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataRequest;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
 
 /**
- * Outgoing snapshot.
+ * Outgoing snapshot. It corresponds to exactly one partition.
+ *
+ * <p>The snapshot has a lock needed for interaction with {@link 
SnapshotAwareMvPartitionStorage}.
  */
 public class OutgoingSnapshot {
+    private static final TableMessagesFactory messagesFactory = new 
TableMessagesFactory();
+
+    private final UUID id;
+
+    private final PartitionAccess partition;
+
+    private final OutgoingSnapshotRegistry outgoingSnapshotRegistry;
+
+    /**
+     * Lock that is used for mutual exclusion of snapshot reading (by this 
class) and threads that write to the same
+     * partition (currently, via {@link SnapshotAwareMvPartitionStorage}).
+     */
+    private final Lock rowOperationsLock = new ReentrantLock();
+
+    /**
+     * {@link RowId}s for which the corresponding rows were sent out of order 
(relative to the order in which this
+     * snapshot sends rows), hence they must be skipped when sending rows 
normally.
+     */
+    private final Set<RowId> overwrittenRowIds = new ConcurrentHashSet<>();
+
+    // TODO: IGNITE-17935 - manage queue size
+    /**
+     * Rows that need to be sent out of order (relative to the order in which 
this snapshot sends rows).
+     * Versions inside rows are in oldest-to-newest order.
+     */
+    private final Queue<SnapshotMvDataResponse.ResponseEntry> outOfOrderMvData 
= new LinkedList<>();
+
+    /**
+     * {@link RowId} used to point (most of the time) to the last processed 
row. More precisely:
+     *
+     * <ul>
+     *     <li>Before we started to read from the partition, this is equal to 
lowest theoretically possible
+     *     {@link RowId} for this partition</li>
+     *     <li>If we started to read from partition AND it is not yet 
exhausted, this is the last RowId that was
+     *     sent in this snapshot order</li>
+     *     <li>After we exhausted the partition, this is {@code null}</li>
+     * </ul>
+     */
+    private RowId lastRowId;
+
+    private boolean startedToReadPartition = false;
+
+    /**
+     * This becomes {@code true} as soon as we exhaust both the partition and 
out-of-order queue.
+     */
+    private boolean finished = false;
+
+    /**
+     * Creates a new instance.
+     */
+    public OutgoingSnapshot(UUID id, PartitionAccess partition, 
OutgoingSnapshotRegistry outgoingSnapshotRegistry) {
+        this.id = id;
+        this.partition = partition;
+        this.outgoingSnapshotRegistry = outgoingSnapshotRegistry;
+
+        lastRowId = partition.minRowId();
+    }
+
+    /**
+     * Returns the ID of this snapshot.
+     */
+    public UUID id() {
+        return id;
+    }
+
+    /**
+     * Returns the key of the corresponding partition.
+     *
+     * @return Partition key.
+     */
+    public PartitionKey partitionKey() {
+        return partition.key();
+    }
+
     /**
      * Reads a snapshot meta and returns a future with the response.
      *
      * @param metaRequest Meta request.
      */
     CompletableFuture<SnapshotMetaResponse> 
handleSnapshotMetaRequest(SnapshotMetaRequest metaRequest) {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-17262
+        //TODO https://issues.apache.org/jira/browse/IGNITE-17935
         return null;
     }
 
     /**
      * Reads chunk of partition data and returns a future with the response.
      *
-     * @param mvDataRequest Data request.
+     * @param request Data request.
      */
-    CompletableFuture<SnapshotMvDataResponse> 
handleSnapshotMvDataRequest(SnapshotMvDataRequest mvDataRequest) {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-17262
-        return null;
+    CompletableFuture<SnapshotMvDataResponse> 
handleSnapshotMvDataRequest(SnapshotMvDataRequest request) {

Review Comment:
   I agree, this operation should be put into a thread-pool. Currently it's 
going to be executed in... where? Some network messages processing pool, that's 
not a good place.



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java:
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.sameInstance;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.hlc.HybridClock;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lock.AutoLockup;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
+import org.apache.ignite.internal.util.Cursor;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class SnapshotAwarePartitionDataStorageTest {
+    @Mock
+    private MvPartitionStorage partitionStorage;
+
+    @Mock
+    private PartitionsSnapshots partitionsSnapshots;
+
+    @Spy
+    private final PartitionKey partitionKey = new 
PartitionKey(UUID.randomUUID(), 1);
+
+    @InjectMocks
+    private SnapshotAwarePartitionDataStorage testedStorage;
+
+    @Mock
+    private PartitionSnapshots partitionSnapshots;
+
+    private final RowId rowId = new RowId(1);

Review Comment:
   1 can be a constant



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManagerTest.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+
+import java.util.UUID;
+import org.apache.ignite.internal.lock.AutoLockup;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
+import org.apache.ignite.network.MessagingService;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class OutgoingSnapshotsManagerTest {

Review Comment:
   What exactly are we testing here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to