rpuch commented on code in PR #1226:
URL: https://github.com/apache/ignite-3/pull/1226#discussion_r1001868409
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java:
##########
@@ -17,36 +17,269 @@
package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaRequest;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaResponse;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataRequest;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
/**
- * Outgoing snapshot.
+ * Outgoing snapshot. It corresponds to exactly one partition.
+ *
+ * <p>The snapshot has a lock needed for interaction with {@link
SnapshotAwareMvPartitionStorage}.
*/
public class OutgoingSnapshot {
+ private static final TableMessagesFactory messagesFactory = new
TableMessagesFactory();
+
+ private final UUID id;
+
+ private final PartitionAccess partition;
+
+ private final OutgoingSnapshotRegistry outgoingSnapshotRegistry;
+
+ /**
+ * Lock that is used for mutual exclusion of snapshot reading (by this
class) and threads that write to the same
+ * partition (currently, via {@link SnapshotAwareMvPartitionStorage}).
+ */
+ private final Lock rowOperationsLock = new ReentrantLock();
+
+ /**
+ * {@link RowId}s for which the corresponding rows were sent out of order
(relative to the order in which this
+ * snapshot sends rows), hence they must be skipped when sending rows
normally.
+ */
+ private final Set<RowId> overwrittenRowIds = new ConcurrentHashSet<>();
+
+ // TODO: IGNITE-17935 - manage queue size
+ /**
+ * Rows that need to be sent out of order (relative to the order in which
this snapshot sends rows).
+ * Versions inside rows are in oldest-to-newest order.
+ */
+ private final Queue<SnapshotMvDataResponse.ResponseEntry> outOfOrderMvData
= new LinkedList<>();
+
+ /**
+ * {@link RowId} used to point (most of the time) to the last processed
row. More precisely:
+ *
+ * <ul>
+ * <li>Before we started to read from the partition, this is equal to
lowest theoretically possible
+ * {@link RowId} for this partition</li>
+ * <li>If we started to read from partition AND it is not yet
exhausted, this is the last RowId that was
+ * sent in this snapshot order</li>
+ * <li>After we exhausted the partition, this is {@code null}</li>
+ * </ul>
+ */
+ private RowId lastRowId;
+
+ private boolean startedToReadPartition = false;
+
+ /**
+ * This becomes {@code true} as soon as we exhaust both the partition and
out-of-order queue.
+ */
+ private boolean finished = false;
+
+ /**
+ * Creates a new instance.
+ */
+ public OutgoingSnapshot(UUID id, PartitionAccess partition,
OutgoingSnapshotRegistry outgoingSnapshotRegistry) {
+ this.id = id;
+ this.partition = partition;
+ this.outgoingSnapshotRegistry = outgoingSnapshotRegistry;
+
+ lastRowId = partition.minRowId();
+ }
+
+ /**
+ * Returns the ID of this snapshot.
+ */
+ public UUID id() {
+ return id;
+ }
+
+ /**
+ * Returns the key of the corresponding partition.
+ *
+ * @return Partition key.
+ */
+ public PartitionKey partitionKey() {
+ return partition.key();
+ }
+
/**
* Reads a snapshot meta and returns a future with the response.
*
* @param metaRequest Meta request.
*/
CompletableFuture<SnapshotMetaResponse>
handleSnapshotMetaRequest(SnapshotMetaRequest metaRequest) {
- //TODO https://issues.apache.org/jira/browse/IGNITE-17262
+ //TODO https://issues.apache.org/jira/browse/IGNITE-17935
return null;
}
/**
* Reads chunk of partition data and returns a future with the response.
*
- * @param mvDataRequest Data request.
+ * @param request Data request.
*/
- CompletableFuture<SnapshotMvDataResponse>
handleSnapshotMvDataRequest(SnapshotMvDataRequest mvDataRequest) {
- //TODO https://issues.apache.org/jira/browse/IGNITE-17262
- return null;
+ CompletableFuture<SnapshotMvDataResponse>
handleSnapshotMvDataRequest(SnapshotMvDataRequest request) {
Review Comment:
I was planning to fix this in
https://issues.apache.org/jira/browse/IGNITE-17935
Do you think we should do it in this PR?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]