ibessonov commented on code in PR #1226: URL: https://github.com/apache/ignite-3/pull/1226#discussion_r1001623608
########## modules/core/src/main/java/org/apache/ignite/internal/lock/AutoLockup.java: ########## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.lock; + +/** + * Represents a lockup (this is an aquisition and owning of a lock like {@link java.util.concurrent.locks.Lock}) Review Comment: This is a weirdly specific description for a thing that only removes an exception from the signature. It doesn't have to represent locks. So why have you decided to do it about locks exclusively? ########## modules/core/src/test/java/org/apache/ignite/internal/lock/ReusableLockLockupTest.java: ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.lock; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.concurrent.locks.ReentrantLock; +import org.junit.jupiter.api.Test; + +class ReusableLockLockupTest { + private final ReentrantLock lock = new ReentrantLock(); + + @SuppressWarnings("resource") + @Test + void doesNotLockTheLockOnCreation() { + ReusableLockLockup ignored = new ReusableLockLockup(lock); + + assertFalse(lock.isLocked()); + } + + @SuppressWarnings("resource") + @Test + void locksTheLockOnAcquire() { + try (AutoLockup ignored = new ReusableLockLockup(lock).acquireLock()) { + assertTrue(lock.isLocked()); + } + } + + @SuppressWarnings({"resource", "EmptyTryBlock"}) + @Test + void unlocksTheLockOnClose() { + try (AutoLockup ignored = new ReusableLockLockup(lock).acquireLock()) { + // No-op. + } + + assertFalse(lock.isLocked()); + } +} Review Comment: Please add an extra line at the end ########## modules/table/src/integrationTest/java/org/apache/ignite/distributed/TestPartitionDataStorage.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.distributed; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.ignite.hlc.HybridTimestamp; +import org.apache.ignite.internal.lock.AutoLockup; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.storage.StorageException; +import org.apache.ignite.internal.storage.TxIdMismatchException; +import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage; +import org.jetbrains.annotations.Nullable; + +/** + * Test implementation of {@link PartitionDataStorage}. + */ +public class TestPartitionDataStorage implements PartitionDataStorage { + private final MvPartitionStorage partitionStorage; + + private final Lock partitionSnapshotsLock = new ReentrantLock(); + + public TestPartitionDataStorage(MvPartitionStorage partitionStorage) { + this.partitionStorage = partitionStorage; + } + + @Override + public <V> V runConsistently(WriteClosure<V> closure) throws StorageException { + return partitionStorage.runConsistently(closure); + } + + @SuppressWarnings("Convert2Lambda") + @Override + public AutoLockup acquirePartitionSnapshotsReadLock() { + partitionSnapshotsLock.lock(); + + return new AutoLockup() { + @Override + public void close() { + partitionSnapshotsLock.unlock(); + } + }; + } + + @Override + public CompletableFuture<Void> flush() { + return partitionStorage.flush(); + } + + @Override + public long lastAppliedIndex() { + return partitionStorage.lastAppliedIndex(); + } + + @Override + public void lastAppliedIndex(long lastAppliedIndex) throws StorageException { + partitionStorage.lastAppliedIndex(lastAppliedIndex); + } + + @Override + public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, UUID commitTableId, + int commitPartitionId) throws TxIdMismatchException, StorageException { + return partitionStorage.addWrite(rowId, row, txId, commitTableId, commitPartitionId); + } + + @Override + public @Nullable BinaryRow abortWrite(RowId rowId) throws StorageException { + return partitionStorage.abortWrite(rowId); + } + + @Override + public void commitWrite(RowId rowId, HybridTimestamp timestamp) throws StorageException { + partitionStorage.commitWrite(rowId, timestamp); + } + + @Override + public MvPartitionStorage getStorage() { + return partitionStorage; + } + + @Override + public void close() throws Exception { Review Comment: I don't like this method, but I know that you have nothing to do with it. In my view, storages must be closed by the outside code when raft group is stopped. Raft listeners didn't instantiate a storage, so they don't need to destroy them ########## modules/table/src/integrationTest/java/org/apache/ignite/distributed/TestPartitionDataStorage.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.distributed; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.ignite.hlc.HybridTimestamp; +import org.apache.ignite.internal.lock.AutoLockup; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.storage.StorageException; +import org.apache.ignite.internal.storage.TxIdMismatchException; +import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage; +import org.jetbrains.annotations.Nullable; + +/** + * Test implementation of {@link PartitionDataStorage}. + */ +public class TestPartitionDataStorage implements PartitionDataStorage { + private final MvPartitionStorage partitionStorage; + + private final Lock partitionSnapshotsLock = new ReentrantLock(); + + public TestPartitionDataStorage(MvPartitionStorage partitionStorage) { + this.partitionStorage = partitionStorage; + } + + @Override + public <V> V runConsistently(WriteClosure<V> closure) throws StorageException { + return partitionStorage.runConsistently(closure); + } + + @SuppressWarnings("Convert2Lambda") + @Override + public AutoLockup acquirePartitionSnapshotsReadLock() { + partitionSnapshotsLock.lock(); + + return new AutoLockup() { + @Override + public void close() { + partitionSnapshotsLock.unlock(); + } + }; + } + + @Override + public CompletableFuture<Void> flush() { + return partitionStorage.flush(); + } + + @Override + public long lastAppliedIndex() { + return partitionStorage.lastAppliedIndex(); + } + + @Override + public void lastAppliedIndex(long lastAppliedIndex) throws StorageException { + partitionStorage.lastAppliedIndex(lastAppliedIndex); + } + + @Override + public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, UUID commitTableId, + int commitPartitionId) throws TxIdMismatchException, StorageException { + return partitionStorage.addWrite(rowId, row, txId, commitTableId, commitPartitionId); + } + + @Override + public @Nullable BinaryRow abortWrite(RowId rowId) throws StorageException { + return partitionStorage.abortWrite(rowId); + } + + @Override + public void commitWrite(RowId rowId, HybridTimestamp timestamp) throws StorageException { + partitionStorage.commitWrite(rowId, timestamp); + } + + @Override + public MvPartitionStorage getStorage() { Review Comment: This should probably be renamed to `getMvStorage`, there are others ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.raft; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.hlc.HybridTimestamp; +import org.apache.ignite.internal.lock.AutoLockup; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.storage.StorageException; +import org.apache.ignite.internal.storage.TxIdMismatchException; +import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.TestOnly; + +/** + * Provides access to both MV (multi-version) data and transactions data of a partition. + * + * <p>Methods writing to MV storage ({@link #addWrite(RowId, BinaryRow, UUID, UUID, int)}, {@link #abortWrite(RowId)} + * and {@link #commitWrite(RowId, HybridTimestamp)}) and TX data storage MUST be invoked under a lock acquired using + * {@link #acquirePartitionSnapshotsReadLock()}. + * + * <p>Each MvPartitionStorage instance represents exactly one partition. All RowIds within a partition are sorted consistently with the + * {@link RowId#compareTo} comparison order. + * + * @see org.apache.ignite.internal.storage.MvPartitionStorage + * @see org.apache.ignite.internal.tx.storage.state.TxStateStorage + */ +public interface PartitionDataStorage extends AutoCloseable { + /** + * Executes {@link WriteClosure} atomically, meaning that partial result of an incomplete closure will never be written to the + * physical device, thus guaranteeing data consistency after restart. Simply runs the closure in case of a volatile storage. + * + * @param closure Data access closure to be executed. + * @param <V> Type of the result returned from the closure. + * @return Closure result. + * @throws StorageException If failed to write data to the storage. + */ + <V> V runConsistently(WriteClosure<V> closure) throws StorageException; + + /** + * Acquires a read lock on partition snapshots. + * + * @return The acquired lockup. It will be released through {@link AutoLockup#close()} invocation. + */ + AutoLockup acquirePartitionSnapshotsReadLock(); + + /** + * Flushes current state of the data or <i>the state from the nearest future</i> to the storage. It means that the future can be + * completed when the underlying storage {@link org.apache.ignite.internal.storage.MvPartitionStorage#persistedIndex()} is higher + * than {@link #lastAppliedIndex()} at the moment of the method's call. This feature + * allows implementing a batch flush for several partitions at once. + * + * @return Future that's completed when flushing of the data is completed. + */ + CompletableFuture<Void> flush(); + + /** + * Index of the highest write command applied to the storage. {@code 0} if index is unknown. + */ + long lastAppliedIndex(); + + /** + * Sets the last applied index value. + */ + void lastAppliedIndex(long lastAppliedIndex) throws StorageException; + + /** + * Creates (or replaces) an uncommitted (aka pending) version, assigned to the given transaction id. + * In details: + * - if there is no uncommitted version, a new uncommitted version is added + * - if there is an uncommitted version belonging to the same transaction, it gets replaced by the given version + * - if there is an uncommitted version belonging to a different transaction, {@link TxIdMismatchException} is thrown + * + * <p>This must be called under a lock acquired using {@link #acquirePartitionSnapshotsReadLock()}. + * + * @param rowId Row id. + * @param row Binary row to update. Key only row means value removal. + * @param txId Transaction id. + * @param commitTableId Commit table id. + * @param commitPartitionId Commit partitionId. + * @return Previous uncommitted row version associated with the row id, or {@code null} if no uncommitted version + * exists before this call + * @throws TxIdMismatchException If there's another pending update associated with different transaction id. + * @throws StorageException If failed to write data to the storage. + */ + @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, UUID commitTableId, int commitPartitionId) + throws TxIdMismatchException, StorageException; + + /** + * Aborts a pending update of the ongoing uncommitted transaction. Invoked during rollback. + * + * <p>This must be called under a lock acquired using {@link #acquirePartitionSnapshotsReadLock()}. + * + * @param rowId Row id. + * @return Previous uncommitted row version associated with the row id. + * @throws StorageException If failed to write data to the storage. + */ + @Nullable BinaryRow abortWrite(RowId rowId) throws StorageException; + + /** + * Commits a pending update of the ongoing transaction. Invoked during commit. Committed value will be versioned by the given timestamp. + * + * <p>This must be called under a lock acquired using {@link #acquirePartitionSnapshotsReadLock()}. + * + * @param rowId Row id. + * @param timestamp Timestamp to associate with committed value. + * @throws StorageException If failed to write data to the storage. + */ + void commitWrite(RowId rowId, HybridTimestamp timestamp) throws StorageException; + + /** + * Returns the underlying {@link MvPartitionStorage}. Only for tests! + * + * @return Underlying {@link MvPartitionStorage}. + */ + @TestOnly Review Comment: What's the problem with it being in the signature? Maybe we should allow it in non-tests usages ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/MvStoragePartitionAccess.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.raft.snapshot; + +import java.util.ArrayList; +import java.util.List; +import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.storage.ReadResult; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.util.Cursor; +import org.jetbrains.annotations.Nullable; + +/** + * {@link PartitionAccess} that adapts an {@link MvPartitionStorage}. + */ +public class MvStoragePartitionAccess implements PartitionAccess { + private final PartitionKey partitionKey; + private final MvPartitionStorage partitionStorage; + + public MvStoragePartitionAccess(PartitionKey partitionKey, MvPartitionStorage partitionStorage) { + this.partitionKey = partitionKey; + this.partitionStorage = partitionStorage; + } + + @Override + public PartitionKey key() { + return partitionKey; + } + + @Override + public long persistedIndex() { + return partitionStorage.persistedIndex(); + } + + @Override + public RowId minRowId() { + return RowId.lowestRowId(partitionKey.partitionId()); + } + + @Override + public @Nullable RowId closestRowId(RowId lowerBound) { + return partitionStorage.closestRowId(lowerBound); + } + + @Override + public List<ReadResult> rowVersions(RowId rowId) { + try (Cursor<ReadResult> cursor = partitionStorage.scanVersions(rowId)) { + List<ReadResult> versions = new ArrayList<>(); + + for (ReadResult version : cursor) { + versions.add(version); + } + + return List.copyOf(versions); + } catch (Exception e) { + // TODO: IGNITE-17935 - handle this? Review Comment: Yes, please :) ########## modules/storage-api/src/main/java/org/apache/ignite/internal/storage/RowId.java: ########## @@ -84,6 +85,16 @@ public long leastSignificantBits() { return uuid.getLeastSignificantBits(); } + /** + * Returns the UUID equivalent of {@link #mostSignificantBits()} and {@link #leastSignificantBits()}. + * + * @return UUID. + */ + @TestOnly + public UUID uuid() { Review Comment: We can make this a part of a "legal" API here. Might be useful in other conditions. ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java: ########## @@ -17,36 +17,269 @@ package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.ignite.hlc.HybridTimestamp; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.storage.ReadResult; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.table.distributed.TableMessagesFactory; +import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess; +import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaRequest; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaResponse; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataRequest; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse; +import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet; /** - * Outgoing snapshot. + * Outgoing snapshot. It corresponds to exactly one partition. + * + * <p>The snapshot has a lock needed for interaction with {@link SnapshotAwarePartitionDataStorage}. */ public class OutgoingSnapshot { + private static final TableMessagesFactory messagesFactory = new TableMessagesFactory(); + + private final UUID id; + + private final PartitionAccess partition; + + private final OutgoingSnapshotRegistry outgoingSnapshotRegistry; + + /** + * Lock that is used for mutual exclusion of snapshot reading (by this class) and threads that write to the same + * partition (currently, via {@link SnapshotAwarePartitionDataStorage}). + */ + private final Lock rowOperationsLock = new ReentrantLock(); + + /** + * {@link RowId}s for which the corresponding rows were sent out of order (relative to the order in which this + * snapshot sends rows), hence they must be skipped when sending rows normally. + */ + private final Set<RowId> overwrittenRowIds = new ConcurrentHashSet<>(); + + // TODO: IGNITE-17935 - manage queue size + /** + * Rows that need to be sent out of order (relative to the order in which this snapshot sends rows). + * Versions inside rows are in oldest-to-newest order. + */ + private final Queue<SnapshotMvDataResponse.ResponseEntry> outOfOrderMvData = new LinkedList<>(); + + /** + * {@link RowId} used to point (most of the time) to the last processed row. More precisely: + * + * <ul> + * <li>Before we started to read from the partition, this is equal to lowest theoretically possible + * {@link RowId} for this partition</li> + * <li>If we started to read from partition AND it is not yet exhausted, this is the last RowId that was + * sent in this snapshot order</li> + * <li>After we exhausted the partition, this is {@code null}</li> + * </ul> + */ + private RowId lastRowId; + + private boolean startedToReadPartition = false; + + /** + * This becomes {@code true} as soon as we exhaust both the partition and out-of-order queue. + */ + private boolean finished = false; + + /** + * Creates a new instance. + */ + public OutgoingSnapshot(UUID id, PartitionAccess partition, OutgoingSnapshotRegistry outgoingSnapshotRegistry) { + this.id = id; + this.partition = partition; + this.outgoingSnapshotRegistry = outgoingSnapshotRegistry; + + lastRowId = partition.minRowId(); + } + + /** + * Returns the ID of this snapshot. + */ + public UUID id() { + return id; + } + + /** + * Returns the key of the corresponding partition. + * + * @return Partition key. + */ + public PartitionKey partitionKey() { + return partition.key(); + } + /** * Reads a snapshot meta and returns a future with the response. * * @param metaRequest Meta request. */ CompletableFuture<SnapshotMetaResponse> handleSnapshotMetaRequest(SnapshotMetaRequest metaRequest) { - //TODO https://issues.apache.org/jira/browse/IGNITE-17262 + //TODO https://issues.apache.org/jira/browse/IGNITE-17935 return null; } /** * Reads chunk of partition data and returns a future with the response. * - * @param mvDataRequest Data request. + * @param request Data request. */ - CompletableFuture<SnapshotMvDataResponse> handleSnapshotMvDataRequest(SnapshotMvDataRequest mvDataRequest) { - //TODO https://issues.apache.org/jira/browse/IGNITE-17262 - return null; + CompletableFuture<SnapshotMvDataResponse> handleSnapshotMvDataRequest(SnapshotMvDataRequest request) { + // TODO: IGNITE-17935 - executor? + + assert !finished; + + long totalBatchSize = 0; + List<SnapshotMvDataResponse.ResponseEntry> batch = new ArrayList<>(); + + while (true) { + acquireLock(); + + try { + totalBatchSize = fillWithOutOfOrderRows(batch, totalBatchSize, request); + + totalBatchSize = tryProcessRowFromPartition(batch, totalBatchSize, request); + + if (exhaustedPartition() && outOfOrderMvData.isEmpty()) { + finished = true; + } + + if (finished || batchIsFull(request, totalBatchSize)) { + break; + } + } finally { + releaseLock(); + } + } + + // We unregister itself outside the lock to avoid a deadlock, because SnapshotAwareMvPartitionStorage takes + // locks in partitionSnapshots.lock -> snapshot.lock; if we did it under lock, we would take locks in the + // opposite order. That's why we need finished flag and cooperation with SnapshotAwareMvPartitionStorage + // (calling our isFinished()). + if (finished) { + outgoingSnapshotRegistry.unregisterOutgoingSnapshot(id); + } + + SnapshotMvDataResponse response = messagesFactory.snapshotMvDataResponse() + .rows(batch) + .finish(finished) + .build(); + return CompletableFuture.completedFuture(response); Review Comment: ```suggestion .build(); return CompletableFuture.completedFuture(response); ``` ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java: ########## @@ -17,36 +17,269 @@ package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.ignite.hlc.HybridTimestamp; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.storage.ReadResult; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.table.distributed.TableMessagesFactory; +import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess; +import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaRequest; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaResponse; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataRequest; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse; +import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet; /** - * Outgoing snapshot. + * Outgoing snapshot. It corresponds to exactly one partition. + * + * <p>The snapshot has a lock needed for interaction with {@link SnapshotAwarePartitionDataStorage}. */ public class OutgoingSnapshot { + private static final TableMessagesFactory messagesFactory = new TableMessagesFactory(); + + private final UUID id; + + private final PartitionAccess partition; + + private final OutgoingSnapshotRegistry outgoingSnapshotRegistry; + + /** + * Lock that is used for mutual exclusion of snapshot reading (by this class) and threads that write to the same + * partition (currently, via {@link SnapshotAwarePartitionDataStorage}). + */ + private final Lock rowOperationsLock = new ReentrantLock(); + + /** + * {@link RowId}s for which the corresponding rows were sent out of order (relative to the order in which this + * snapshot sends rows), hence they must be skipped when sending rows normally. + */ + private final Set<RowId> overwrittenRowIds = new ConcurrentHashSet<>(); + + // TODO: IGNITE-17935 - manage queue size + /** + * Rows that need to be sent out of order (relative to the order in which this snapshot sends rows). + * Versions inside rows are in oldest-to-newest order. + */ + private final Queue<SnapshotMvDataResponse.ResponseEntry> outOfOrderMvData = new LinkedList<>(); + + /** + * {@link RowId} used to point (most of the time) to the last processed row. More precisely: + * + * <ul> + * <li>Before we started to read from the partition, this is equal to lowest theoretically possible + * {@link RowId} for this partition</li> + * <li>If we started to read from partition AND it is not yet exhausted, this is the last RowId that was + * sent in this snapshot order</li> + * <li>After we exhausted the partition, this is {@code null}</li> + * </ul> + */ + private RowId lastRowId; + + private boolean startedToReadPartition = false; + + /** + * This becomes {@code true} as soon as we exhaust both the partition and out-of-order queue. + */ + private boolean finished = false; + + /** + * Creates a new instance. + */ + public OutgoingSnapshot(UUID id, PartitionAccess partition, OutgoingSnapshotRegistry outgoingSnapshotRegistry) { + this.id = id; + this.partition = partition; + this.outgoingSnapshotRegistry = outgoingSnapshotRegistry; + + lastRowId = partition.minRowId(); + } + + /** + * Returns the ID of this snapshot. + */ + public UUID id() { + return id; + } + + /** + * Returns the key of the corresponding partition. + * + * @return Partition key. + */ + public PartitionKey partitionKey() { + return partition.key(); + } + /** * Reads a snapshot meta and returns a future with the response. * * @param metaRequest Meta request. */ CompletableFuture<SnapshotMetaResponse> handleSnapshotMetaRequest(SnapshotMetaRequest metaRequest) { - //TODO https://issues.apache.org/jira/browse/IGNITE-17262 + //TODO https://issues.apache.org/jira/browse/IGNITE-17935 return null; } /** * Reads chunk of partition data and returns a future with the response. * - * @param mvDataRequest Data request. + * @param request Data request. */ - CompletableFuture<SnapshotMvDataResponse> handleSnapshotMvDataRequest(SnapshotMvDataRequest mvDataRequest) { - //TODO https://issues.apache.org/jira/browse/IGNITE-17262 - return null; + CompletableFuture<SnapshotMvDataResponse> handleSnapshotMvDataRequest(SnapshotMvDataRequest request) { + // TODO: IGNITE-17935 - executor? + + assert !finished; + + long totalBatchSize = 0; + List<SnapshotMvDataResponse.ResponseEntry> batch = new ArrayList<>(); + + while (true) { + acquireLock(); + + try { + totalBatchSize = fillWithOutOfOrderRows(batch, totalBatchSize, request); + + totalBatchSize = tryProcessRowFromPartition(batch, totalBatchSize, request); + + if (exhaustedPartition() && outOfOrderMvData.isEmpty()) { + finished = true; + } + + if (finished || batchIsFull(request, totalBatchSize)) { + break; + } + } finally { + releaseLock(); + } + } + + // We unregister itself outside the lock to avoid a deadlock, because SnapshotAwareMvPartitionStorage takes + // locks in partitionSnapshots.lock -> snapshot.lock; if we did it under lock, we would take locks in the + // opposite order. That's why we need finished flag and cooperation with SnapshotAwareMvPartitionStorage + // (calling our isFinished()). + if (finished) { + outgoingSnapshotRegistry.unregisterOutgoingSnapshot(id); + } + + SnapshotMvDataResponse response = messagesFactory.snapshotMvDataResponse() + .rows(batch) + .finish(finished) + .build(); + return CompletableFuture.completedFuture(response); + } + + private long fillWithOutOfOrderRows( + List<SnapshotMvDataResponse.ResponseEntry> rowEntries, + long totalBytesBefore, + SnapshotMvDataRequest request + ) { + long totalBytesAfter = totalBytesBefore; + + while (totalBytesAfter < request.batchSizeHint()) { + SnapshotMvDataResponse.ResponseEntry rowEntry = outOfOrderMvData.poll(); + + if (rowEntry == null) { + break; + } + + rowEntries.add(rowEntry); + totalBytesAfter += rowSizeInBytes(rowEntry.rowVersions()); + } + + return totalBytesAfter; + } + + private long rowSizeInBytes(List<ByteBuffer> rowVersions) { + long sum = 0; + + for (ByteBuffer buf : rowVersions) { + if (buf != null) { + sum += buf.limit(); + } + } + + return sum; + } + + private long tryProcessRowFromPartition(List<SnapshotMvDataResponse.ResponseEntry> batch, long totalBatchSize, + SnapshotMvDataRequest request) { + if (batchIsFull(request, totalBatchSize) || exhaustedPartition()) { + return totalBatchSize; + } + + if (!startedToReadPartition) { + lastRowId = partition.closestRowId(lastRowId); + + startedToReadPartition = true; + } else { + lastRowId = partition.closestRowId(lastRowId.increment()); + } + + if (!exhaustedPartition()) { + if (!overwrittenRowIds.remove(lastRowId)) { + List<ReadResult> rowVersions = new ArrayList<>(partition.rowVersions(lastRowId)); + Collections.reverse(rowVersions); + SnapshotMvDataResponse.ResponseEntry rowEntry = rowEntry(lastRowId, rowVersions); + + batch.add(rowEntry); + + totalBatchSize += rowSizeInBytes(rowEntry.rowVersions()); + } + } + + return totalBatchSize; + } + + private boolean batchIsFull(SnapshotMvDataRequest request, long totalBatchSize) { + return totalBatchSize >= request.batchSizeHint(); + } + + private boolean exhaustedPartition() { + return lastRowId == null; + } + + private SnapshotMvDataResponse.ResponseEntry rowEntry(RowId rowId, List<ReadResult> rowVersions) { + List<ByteBuffer> buffers = new ArrayList<>(); Review Comment: Can we pass a size? Or that's a preliminary optimization? I suspect that usually these chains would be pretty short. Way shorter than the default 10 ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java: ########## @@ -112,6 +153,7 @@ private void messageHandler(NetworkMessage networkMessage, NetworkAddress sender CompletableFuture<? extends NetworkMessage> responseFuture = handleSnapshotRequestMessage(networkMessage, outgoingSnapshot); if (responseFuture != null) { + //TODO: IGNITE-17935 - whenComplete()? handle()? Should we analyze the first exception at all? Review Comment: I guess we should analyze the exception. If something went wrong, we should act. Don't know how though ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java: ########## @@ -17,36 +17,269 @@ package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.ignite.hlc.HybridTimestamp; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.storage.ReadResult; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.table.distributed.TableMessagesFactory; +import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess; +import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaRequest; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaResponse; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataRequest; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse; +import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet; /** - * Outgoing snapshot. + * Outgoing snapshot. It corresponds to exactly one partition. + * + * <p>The snapshot has a lock needed for interaction with {@link SnapshotAwarePartitionDataStorage}. */ public class OutgoingSnapshot { + private static final TableMessagesFactory messagesFactory = new TableMessagesFactory(); + + private final UUID id; + + private final PartitionAccess partition; + + private final OutgoingSnapshotRegistry outgoingSnapshotRegistry; + + /** + * Lock that is used for mutual exclusion of snapshot reading (by this class) and threads that write to the same + * partition (currently, via {@link SnapshotAwarePartitionDataStorage}). + */ + private final Lock rowOperationsLock = new ReentrantLock(); + + /** + * {@link RowId}s for which the corresponding rows were sent out of order (relative to the order in which this + * snapshot sends rows), hence they must be skipped when sending rows normally. + */ + private final Set<RowId> overwrittenRowIds = new ConcurrentHashSet<>(); + + // TODO: IGNITE-17935 - manage queue size + /** + * Rows that need to be sent out of order (relative to the order in which this snapshot sends rows). + * Versions inside rows are in oldest-to-newest order. + */ + private final Queue<SnapshotMvDataResponse.ResponseEntry> outOfOrderMvData = new LinkedList<>(); + + /** + * {@link RowId} used to point (most of the time) to the last processed row. More precisely: + * + * <ul> + * <li>Before we started to read from the partition, this is equal to lowest theoretically possible + * {@link RowId} for this partition</li> + * <li>If we started to read from partition AND it is not yet exhausted, this is the last RowId that was + * sent in this snapshot order</li> + * <li>After we exhausted the partition, this is {@code null}</li> + * </ul> + */ + private RowId lastRowId; + + private boolean startedToReadPartition = false; + + /** + * This becomes {@code true} as soon as we exhaust both the partition and out-of-order queue. + */ + private boolean finished = false; + + /** + * Creates a new instance. + */ + public OutgoingSnapshot(UUID id, PartitionAccess partition, OutgoingSnapshotRegistry outgoingSnapshotRegistry) { + this.id = id; + this.partition = partition; + this.outgoingSnapshotRegistry = outgoingSnapshotRegistry; + + lastRowId = partition.minRowId(); + } + + /** + * Returns the ID of this snapshot. + */ + public UUID id() { + return id; + } + + /** + * Returns the key of the corresponding partition. + * + * @return Partition key. + */ + public PartitionKey partitionKey() { + return partition.key(); + } + /** * Reads a snapshot meta and returns a future with the response. * * @param metaRequest Meta request. */ CompletableFuture<SnapshotMetaResponse> handleSnapshotMetaRequest(SnapshotMetaRequest metaRequest) { - //TODO https://issues.apache.org/jira/browse/IGNITE-17262 + //TODO https://issues.apache.org/jira/browse/IGNITE-17935 return null; } /** * Reads chunk of partition data and returns a future with the response. * - * @param mvDataRequest Data request. + * @param request Data request. */ - CompletableFuture<SnapshotMvDataResponse> handleSnapshotMvDataRequest(SnapshotMvDataRequest mvDataRequest) { - //TODO https://issues.apache.org/jira/browse/IGNITE-17262 - return null; + CompletableFuture<SnapshotMvDataResponse> handleSnapshotMvDataRequest(SnapshotMvDataRequest request) { + // TODO: IGNITE-17935 - executor? + + assert !finished; + + long totalBatchSize = 0; + List<SnapshotMvDataResponse.ResponseEntry> batch = new ArrayList<>(); + + while (true) { + acquireLock(); + + try { + totalBatchSize = fillWithOutOfOrderRows(batch, totalBatchSize, request); + + totalBatchSize = tryProcessRowFromPartition(batch, totalBatchSize, request); + + if (exhaustedPartition() && outOfOrderMvData.isEmpty()) { + finished = true; + } + + if (finished || batchIsFull(request, totalBatchSize)) { + break; + } + } finally { + releaseLock(); + } + } + + // We unregister itself outside the lock to avoid a deadlock, because SnapshotAwareMvPartitionStorage takes + // locks in partitionSnapshots.lock -> snapshot.lock; if we did it under lock, we would take locks in the + // opposite order. That's why we need finished flag and cooperation with SnapshotAwareMvPartitionStorage + // (calling our isFinished()). + if (finished) { + outgoingSnapshotRegistry.unregisterOutgoingSnapshot(id); + } + + SnapshotMvDataResponse response = messagesFactory.snapshotMvDataResponse() + .rows(batch) + .finish(finished) + .build(); + return CompletableFuture.completedFuture(response); + } + + private long fillWithOutOfOrderRows( + List<SnapshotMvDataResponse.ResponseEntry> rowEntries, + long totalBytesBefore, + SnapshotMvDataRequest request + ) { + long totalBytesAfter = totalBytesBefore; + + while (totalBytesAfter < request.batchSizeHint()) { + SnapshotMvDataResponse.ResponseEntry rowEntry = outOfOrderMvData.poll(); + + if (rowEntry == null) { + break; + } + + rowEntries.add(rowEntry); + totalBytesAfter += rowSizeInBytes(rowEntry.rowVersions()); + } + + return totalBytesAfter; + } + + private long rowSizeInBytes(List<ByteBuffer> rowVersions) { + long sum = 0; + + for (ByteBuffer buf : rowVersions) { + if (buf != null) { + sum += buf.limit(); + } + } + + return sum; + } + + private long tryProcessRowFromPartition(List<SnapshotMvDataResponse.ResponseEntry> batch, long totalBatchSize, + SnapshotMvDataRequest request) { + if (batchIsFull(request, totalBatchSize) || exhaustedPartition()) { + return totalBatchSize; + } + + if (!startedToReadPartition) { + lastRowId = partition.closestRowId(lastRowId); + + startedToReadPartition = true; + } else { + lastRowId = partition.closestRowId(lastRowId.increment()); + } + + if (!exhaustedPartition()) { + if (!overwrittenRowIds.remove(lastRowId)) { + List<ReadResult> rowVersions = new ArrayList<>(partition.rowVersions(lastRowId)); + Collections.reverse(rowVersions); + SnapshotMvDataResponse.ResponseEntry rowEntry = rowEntry(lastRowId, rowVersions); + + batch.add(rowEntry); + + totalBatchSize += rowSizeInBytes(rowEntry.rowVersions()); + } + } + + return totalBatchSize; + } + + private boolean batchIsFull(SnapshotMvDataRequest request, long totalBatchSize) { + return totalBatchSize >= request.batchSizeHint(); + } + + private boolean exhaustedPartition() { + return lastRowId == null; + } + + private SnapshotMvDataResponse.ResponseEntry rowEntry(RowId rowId, List<ReadResult> rowVersions) { + List<ByteBuffer> buffers = new ArrayList<>(); + List<HybridTimestamp> commitTimestamps = new ArrayList<>(); + UUID transactionId = null; + UUID commitTableId = null; + int commitPartitionId = ReadResult.UNDEFINED_COMMIT_PARTITION_ID; + + for (ReadResult version : rowVersions) { + BinaryRow row = version.binaryRow(); + + buffers.add(row == null ? null : row.byteBuffer()); + + if (version.isWriteIntent()) { + transactionId = version.transactionId(); + commitTableId = version.commitTableId(); + commitPartitionId = version.commitPartitionId(); + } else { + commitTimestamps.add(version.commitTimestamp()); + } + } + + return messagesFactory.responseEntry() + .rowId(new UUID(rowId.mostSignificantBits(), rowId.leastSignificantBits())) Review Comment: This is what I meant. `uuid()` method may be useful ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java: ########## @@ -55,7 +288,76 @@ CompletableFuture<SnapshotMvDataResponse> handleSnapshotMvDataRequest(SnapshotMv * @param txDataRequest Data request. */ CompletableFuture<SnapshotTxDataResponse> handleSnapshotTxDataRequest(SnapshotTxDataRequest txDataRequest) { - //TODO https://issues.apache.org/jira/browse/IGNITE-17262 + //TODO https://issues.apache.org/jira/browse/IGNITE-17935 return null; } + + /** + * Acquires this snapshot lock. + */ + public void acquireLock() { + rowOperationsLock.lock(); + } + + /** + * Releases this snapshot lock. + */ + public void releaseLock() { + rowOperationsLock.unlock(); + } + + /** + * Whether this snapshot is finished (i.e. it already sent all the MV data and is not going to send anything else). + * + * <p>Must be called under snapshot lock. + * + * @return {@code true} if finished. + */ + public boolean isFinished() { + return finished; + } + + /** + * Adds a {@link RowId} to the collection of IDs that need to be skipped during normal snapshot row sending. + * + * <p>Must be called under snapshot lock. + * + * @param rowId RowId to add. + * @return {@code true} if the given RowId was added as it was not yet in the collection of IDs to skip. + */ + public boolean addOverwrittenRowId(RowId rowId) { + return overwrittenRowIds.add(rowId); + } + + /** + * Returns {@code true} if the given {@link RowId} does not interfere with the rows that this snapshot is going + * to be sent in the normal snapshot rows sending order. + * + * <p>Must be called under snapshot lock. + * + * @param rowId RowId. + * @return {@code true} if the given RowId is already passed by the snapshot in normal rows sending order. + */ + public boolean alreadyPassed(RowId rowId) { + if (!startedToReadPartition) { + return false; + } + if (exhaustedPartition()) { + return true; + } + + return rowId.compareTo(lastRowId) <= 0; + } + + /** + * Enqueues a row for out-of-order sending. + * + * <p>Must be called under snapshot lock. + * + * @param rowId {@link RowId} of the row. + * @param rowVersions Versions of the row (oldest to newest). + */ + public void enqueueForSending(RowId rowId, List<ReadResult> rowVersions) { Review Comment: I see inconsistency here. Regular sends read data in this class, but RAFT commands must themselves read rows before appending them to the queue. This is a little bizarre, can we just pass RowId here and do all the magic inside? ########## modules/table/src/integrationTest/java/org/apache/ignite/distributed/TestPartitionDataStorage.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.distributed; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.ignite.hlc.HybridTimestamp; +import org.apache.ignite.internal.lock.AutoLockup; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.storage.StorageException; +import org.apache.ignite.internal.storage.TxIdMismatchException; +import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage; +import org.jetbrains.annotations.Nullable; + +/** + * Test implementation of {@link PartitionDataStorage}. + */ +public class TestPartitionDataStorage implements PartitionDataStorage { + private final MvPartitionStorage partitionStorage; + + private final Lock partitionSnapshotsLock = new ReentrantLock(); + + public TestPartitionDataStorage(MvPartitionStorage partitionStorage) { + this.partitionStorage = partitionStorage; + } + + @Override + public <V> V runConsistently(WriteClosure<V> closure) throws StorageException { + return partitionStorage.runConsistently(closure); + } + + @SuppressWarnings("Convert2Lambda") + @Override + public AutoLockup acquirePartitionSnapshotsReadLock() { + partitionSnapshotsLock.lock(); + + return new AutoLockup() { + @Override + public void close() { + partitionSnapshotsLock.unlock(); + } + }; + } + + @Override + public CompletableFuture<Void> flush() { + return partitionStorage.flush(); + } + + @Override + public long lastAppliedIndex() { + return partitionStorage.lastAppliedIndex(); + } + + @Override + public void lastAppliedIndex(long lastAppliedIndex) throws StorageException { + partitionStorage.lastAppliedIndex(lastAppliedIndex); + } + + @Override + public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, UUID commitTableId, + int commitPartitionId) throws TxIdMismatchException, StorageException { + return partitionStorage.addWrite(rowId, row, txId, commitTableId, commitPartitionId); + } + + @Override + public @Nullable BinaryRow abortWrite(RowId rowId) throws StorageException { + return partitionStorage.abortWrite(rowId); + } + + @Override + public void commitWrite(RowId rowId, HybridTimestamp timestamp) throws StorageException { + partitionStorage.commitWrite(rowId, timestamp); + } + + @Override + public MvPartitionStorage getStorage() { + return partitionStorage; + } + + @Override + public void close() throws Exception { Review Comment: I don't know what you're going to do with this information honestly ########## modules/table/src/integrationTest/java/org/apache/ignite/distributed/TestPartitionDataStorage.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.distributed; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.ignite.hlc.HybridTimestamp; +import org.apache.ignite.internal.lock.AutoLockup; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.storage.StorageException; +import org.apache.ignite.internal.storage.TxIdMismatchException; +import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage; +import org.jetbrains.annotations.Nullable; + +/** + * Test implementation of {@link PartitionDataStorage}. + */ +public class TestPartitionDataStorage implements PartitionDataStorage { + private final MvPartitionStorage partitionStorage; + + private final Lock partitionSnapshotsLock = new ReentrantLock(); + + public TestPartitionDataStorage(MvPartitionStorage partitionStorage) { + this.partitionStorage = partitionStorage; + } + + @Override + public <V> V runConsistently(WriteClosure<V> closure) throws StorageException { + return partitionStorage.runConsistently(closure); + } + + @SuppressWarnings("Convert2Lambda") Review Comment: But why not using a lambda? Looks convenient to me. BTW, I'd advice declaring AutoLockup as a `FunctionalInterface` ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/message/SnapshotMvDataResponse.java: ########## @@ -64,7 +64,8 @@ interface ResponseEntry extends NetworkMessage { /** Commit table id for write-intent if it's present. */ @Nullable UUID commitTableId(); - /** Commit partition id for write-intent if it's present. {@code -1} otherwise. */ + /** Commit partition id for write-intent if it's present. + * {@link org.apache.ignite.internal.storage.ReadResult#UNDEFINED_COMMIT_PARTITION_ID} otherwise. */ Review Comment: Short name would look nicer, can you add this class to imports? ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.raft; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.hlc.HybridTimestamp; +import org.apache.ignite.internal.lock.AutoLockup; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.storage.StorageException; +import org.apache.ignite.internal.storage.TxIdMismatchException; +import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.TestOnly; + +/** + * Provides access to both MV (multi-version) data and transactions data of a partition. + * + * <p>Methods writing to MV storage ({@link #addWrite(RowId, BinaryRow, UUID, UUID, int)}, {@link #abortWrite(RowId)} + * and {@link #commitWrite(RowId, HybridTimestamp)}) and TX data storage MUST be invoked under a lock acquired using + * {@link #acquirePartitionSnapshotsReadLock()}. + * + * <p>Each MvPartitionStorage instance represents exactly one partition. All RowIds within a partition are sorted consistently with the + * {@link RowId#compareTo} comparison order. + * + * @see org.apache.ignite.internal.storage.MvPartitionStorage + * @see org.apache.ignite.internal.tx.storage.state.TxStateStorage + */ +public interface PartitionDataStorage extends AutoCloseable { + /** + * Executes {@link WriteClosure} atomically, meaning that partial result of an incomplete closure will never be written to the + * physical device, thus guaranteeing data consistency after restart. Simply runs the closure in case of a volatile storage. + * + * @param closure Data access closure to be executed. + * @param <V> Type of the result returned from the closure. + * @return Closure result. + * @throws StorageException If failed to write data to the storage. + */ Review Comment: You should probably add links like `@see MvPartitionStorage#runConsistently(WriteClosure)` in all methods ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/MvStoragePartitionAccess.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.raft.snapshot; + +import java.util.ArrayList; +import java.util.List; +import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.storage.ReadResult; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.util.Cursor; +import org.jetbrains.annotations.Nullable; + +/** + * {@link PartitionAccess} that adapts an {@link MvPartitionStorage}. + */ +public class MvStoragePartitionAccess implements PartitionAccess { + private final PartitionKey partitionKey; + private final MvPartitionStorage partitionStorage; + + public MvStoragePartitionAccess(PartitionKey partitionKey, MvPartitionStorage partitionStorage) { + this.partitionKey = partitionKey; + this.partitionStorage = partitionStorage; + } + + @Override + public PartitionKey key() { + return partitionKey; + } + + @Override + public long persistedIndex() { + return partitionStorage.persistedIndex(); + } + + @Override + public RowId minRowId() { + return RowId.lowestRowId(partitionKey.partitionId()); + } + + @Override + public @Nullable RowId closestRowId(RowId lowerBound) { + return partitionStorage.closestRowId(lowerBound); + } + + @Override + public List<ReadResult> rowVersions(RowId rowId) { + try (Cursor<ReadResult> cursor = partitionStorage.scanVersions(rowId)) { + List<ReadResult> versions = new ArrayList<>(); + + for (ReadResult version : cursor) { + versions.add(version); + } + + return List.copyOf(versions); Review Comment: What's the idea behind a "copyOf" here? It's safe to return mutable collection. ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/MvStoragePartitionAccess.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.raft.snapshot; + +import java.util.ArrayList; +import java.util.List; +import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.storage.ReadResult; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.util.Cursor; +import org.jetbrains.annotations.Nullable; + +/** + * {@link PartitionAccess} that adapts an {@link MvPartitionStorage}. + */ +public class MvStoragePartitionAccess implements PartitionAccess { + private final PartitionKey partitionKey; + private final MvPartitionStorage partitionStorage; + + public MvStoragePartitionAccess(PartitionKey partitionKey, MvPartitionStorage partitionStorage) { + this.partitionKey = partitionKey; + this.partitionStorage = partitionStorage; + } + + @Override + public PartitionKey key() { + return partitionKey; + } + + @Override + public long persistedIndex() { + return partitionStorage.persistedIndex(); + } + + @Override + public RowId minRowId() { Review Comment: I think that this method shouldn't be here, cause it has the same implementation everywhere. Just make a `partitionId()` and that's it EDIT I see that there's a "partitionKey" that contains partition id ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java: ########## @@ -17,36 +17,269 @@ package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.ignite.hlc.HybridTimestamp; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.storage.ReadResult; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.table.distributed.TableMessagesFactory; +import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess; +import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaRequest; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaResponse; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataRequest; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse; +import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet; /** - * Outgoing snapshot. + * Outgoing snapshot. It corresponds to exactly one partition. + * + * <p>The snapshot has a lock needed for interaction with {@link SnapshotAwarePartitionDataStorage}. */ public class OutgoingSnapshot { + private static final TableMessagesFactory messagesFactory = new TableMessagesFactory(); Review Comment: So it's `static final` after all :) Unorthodox naming, should be upper case ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionKey.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.raft.snapshot; + +import java.util.Objects; +import java.util.UUID; + +/** + * Uniquely identifies a partition. This is a pair of internal table ID and partition number (aka partition ID). + */ +public class PartitionKey { + private final UUID tableId; + private final int partitionId; + + /** + * Returns partition ID. + * + * @return Partition ID. + */ + public int partitionId() { + return partitionId; + } + + /** + * Constructs a new partition key. + */ + public PartitionKey(UUID tableId, int partitionId) { + Objects.requireNonNull(tableId, "tableId cannot be null"); + + this.tableId = tableId; + this.partitionId = partitionId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PartitionKey that = (PartitionKey) o; + return partitionId == that.partitionId && tableId.equals(that.tableId); + } + + @Override + public int hashCode() { + return Objects.hash(tableId, partitionId); + } + + @Override + public String toString() { + return "PartitionKey{" Review Comment: Looks like an auto-generated code from IDEA. We have a different format, you can use utility class S or just change the code ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -860,8 +881,8 @@ private RaftGroupOptions groupOptionsForPartition( raftGroupOptions.snapshotStorageFactory(new PartitionSnapshotStorageFactory( raftMgr.topologyService(), //TODO IGNITE-17302 Use miniumum from mv storage and tx state storage. - new OutgoingSnapshotsManager(raftMgr.messagingService()), - partitionStorage::persistedIndex, + outgoingSnapshotsManager, + new MvStoragePartitionAccess(partitionKey(internalTbl, partId), partitionStorage), Review Comment: Please ask @alievmirza for his PR to validate what he did in this exact line, so that you both know what's happening. He showed an interest in what you're doing btw ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java: ########## @@ -55,7 +288,76 @@ CompletableFuture<SnapshotMvDataResponse> handleSnapshotMvDataRequest(SnapshotMv * @param txDataRequest Data request. */ CompletableFuture<SnapshotTxDataResponse> handleSnapshotTxDataRequest(SnapshotTxDataRequest txDataRequest) { - //TODO https://issues.apache.org/jira/browse/IGNITE-17262 + //TODO https://issues.apache.org/jira/browse/IGNITE-17935 return null; } + + /** + * Acquires this snapshot lock. + */ + public void acquireLock() { + rowOperationsLock.lock(); + } + + /** + * Releases this snapshot lock. + */ + public void releaseLock() { + rowOperationsLock.unlock(); + } + + /** + * Whether this snapshot is finished (i.e. it already sent all the MV data and is not going to send anything else). + * + * <p>Must be called under snapshot lock. + * + * @return {@code true} if finished. + */ + public boolean isFinished() { + return finished; + } + + /** + * Adds a {@link RowId} to the collection of IDs that need to be skipped during normal snapshot row sending. + * + * <p>Must be called under snapshot lock. + * + * @param rowId RowId to add. + * @return {@code true} if the given RowId was added as it was not yet in the collection of IDs to skip. + */ + public boolean addOverwrittenRowId(RowId rowId) { Review Comment: I see that you decided to leave the name from the description. It it good enough? To be honest' I didn't think too hard before choosing it ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java: ########## @@ -17,36 +17,269 @@ package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.ignite.hlc.HybridTimestamp; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.storage.ReadResult; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.table.distributed.TableMessagesFactory; +import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess; +import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaRequest; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaResponse; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataRequest; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse; +import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet; /** - * Outgoing snapshot. + * Outgoing snapshot. It corresponds to exactly one partition. + * + * <p>The snapshot has a lock needed for interaction with {@link SnapshotAwarePartitionDataStorage}. */ public class OutgoingSnapshot { + private static final TableMessagesFactory messagesFactory = new TableMessagesFactory(); + + private final UUID id; + + private final PartitionAccess partition; + + private final OutgoingSnapshotRegistry outgoingSnapshotRegistry; + + /** + * Lock that is used for mutual exclusion of snapshot reading (by this class) and threads that write to the same + * partition (currently, via {@link SnapshotAwarePartitionDataStorage}). + */ + private final Lock rowOperationsLock = new ReentrantLock(); + + /** + * {@link RowId}s for which the corresponding rows were sent out of order (relative to the order in which this + * snapshot sends rows), hence they must be skipped when sending rows normally. + */ + private final Set<RowId> overwrittenRowIds = new ConcurrentHashSet<>(); + + // TODO: IGNITE-17935 - manage queue size + /** + * Rows that need to be sent out of order (relative to the order in which this snapshot sends rows). + * Versions inside rows are in oldest-to-newest order. + */ + private final Queue<SnapshotMvDataResponse.ResponseEntry> outOfOrderMvData = new LinkedList<>(); + + /** + * {@link RowId} used to point (most of the time) to the last processed row. More precisely: + * + * <ul> + * <li>Before we started to read from the partition, this is equal to lowest theoretically possible + * {@link RowId} for this partition</li> + * <li>If we started to read from partition AND it is not yet exhausted, this is the last RowId that was + * sent in this snapshot order</li> + * <li>After we exhausted the partition, this is {@code null}</li> + * </ul> + */ + private RowId lastRowId; + + private boolean startedToReadPartition = false; + + /** + * This becomes {@code true} as soon as we exhaust both the partition and out-of-order queue. + */ + private boolean finished = false; + + /** + * Creates a new instance. + */ + public OutgoingSnapshot(UUID id, PartitionAccess partition, OutgoingSnapshotRegistry outgoingSnapshotRegistry) { + this.id = id; + this.partition = partition; + this.outgoingSnapshotRegistry = outgoingSnapshotRegistry; + + lastRowId = partition.minRowId(); + } + + /** + * Returns the ID of this snapshot. + */ + public UUID id() { + return id; + } + + /** + * Returns the key of the corresponding partition. + * + * @return Partition key. + */ + public PartitionKey partitionKey() { + return partition.key(); + } + /** * Reads a snapshot meta and returns a future with the response. * * @param metaRequest Meta request. */ CompletableFuture<SnapshotMetaResponse> handleSnapshotMetaRequest(SnapshotMetaRequest metaRequest) { - //TODO https://issues.apache.org/jira/browse/IGNITE-17262 + //TODO https://issues.apache.org/jira/browse/IGNITE-17935 return null; } /** * Reads chunk of partition data and returns a future with the response. * - * @param mvDataRequest Data request. + * @param request Data request. */ - CompletableFuture<SnapshotMvDataResponse> handleSnapshotMvDataRequest(SnapshotMvDataRequest mvDataRequest) { - //TODO https://issues.apache.org/jira/browse/IGNITE-17262 - return null; + CompletableFuture<SnapshotMvDataResponse> handleSnapshotMvDataRequest(SnapshotMvDataRequest request) { + // TODO: IGNITE-17935 - executor? + + assert !finished; + + long totalBatchSize = 0; + List<SnapshotMvDataResponse.ResponseEntry> batch = new ArrayList<>(); + + while (true) { + acquireLock(); + + try { + totalBatchSize = fillWithOutOfOrderRows(batch, totalBatchSize, request); + + totalBatchSize = tryProcessRowFromPartition(batch, totalBatchSize, request); + + if (exhaustedPartition() && outOfOrderMvData.isEmpty()) { + finished = true; + } + + if (finished || batchIsFull(request, totalBatchSize)) { + break; + } + } finally { + releaseLock(); + } + } + + // We unregister itself outside the lock to avoid a deadlock, because SnapshotAwareMvPartitionStorage takes + // locks in partitionSnapshots.lock -> snapshot.lock; if we did it under lock, we would take locks in the + // opposite order. That's why we need finished flag and cooperation with SnapshotAwareMvPartitionStorage + // (calling our isFinished()). + if (finished) { + outgoingSnapshotRegistry.unregisterOutgoingSnapshot(id); + } + + SnapshotMvDataResponse response = messagesFactory.snapshotMvDataResponse() + .rows(batch) + .finish(finished) + .build(); + return CompletableFuture.completedFuture(response); + } + + private long fillWithOutOfOrderRows( + List<SnapshotMvDataResponse.ResponseEntry> rowEntries, + long totalBytesBefore, + SnapshotMvDataRequest request + ) { + long totalBytesAfter = totalBytesBefore; + + while (totalBytesAfter < request.batchSizeHint()) { + SnapshotMvDataResponse.ResponseEntry rowEntry = outOfOrderMvData.poll(); + + if (rowEntry == null) { + break; + } + + rowEntries.add(rowEntry); + totalBytesAfter += rowSizeInBytes(rowEntry.rowVersions()); + } + + return totalBytesAfter; + } + + private long rowSizeInBytes(List<ByteBuffer> rowVersions) { + long sum = 0; + + for (ByteBuffer buf : rowVersions) { + if (buf != null) { + sum += buf.limit(); Review Comment: Technically, you should subtract a position, but our ruler are very vague here. Maybe position must be 0 here, who knows? We'll figure it out one day ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java: ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.hlc.HybridTimestamp; +import org.apache.ignite.internal.lock.AutoLockup; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure; +import org.apache.ignite.internal.storage.ReadResult; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.storage.StorageException; +import org.apache.ignite.internal.storage.TxIdMismatchException; +import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage; +import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey; +import org.apache.ignite.internal.util.Cursor; +import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.TestOnly; + +/** + * {@link MvPartitionStorage} decorator that adds snapshot awareness. This means that writes coordinate with ongoing + * snapshots to make sure that the writes do not interfere with the snapshots. + */ +public class SnapshotAwarePartitionDataStorage implements PartitionDataStorage { Review Comment: Like here. What's the point of having two almost identical implementations? This can be a simple class, without dedicated interface that comes with it ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java: ########## @@ -17,36 +17,269 @@ package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.ignite.hlc.HybridTimestamp; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.storage.ReadResult; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.table.distributed.TableMessagesFactory; +import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess; +import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaRequest; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaResponse; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataRequest; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse; +import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet; /** - * Outgoing snapshot. + * Outgoing snapshot. It corresponds to exactly one partition. + * + * <p>The snapshot has a lock needed for interaction with {@link SnapshotAwarePartitionDataStorage}. */ public class OutgoingSnapshot { + private static final TableMessagesFactory messagesFactory = new TableMessagesFactory(); + + private final UUID id; + + private final PartitionAccess partition; + + private final OutgoingSnapshotRegistry outgoingSnapshotRegistry; + + /** + * Lock that is used for mutual exclusion of snapshot reading (by this class) and threads that write to the same + * partition (currently, via {@link SnapshotAwarePartitionDataStorage}). + */ + private final Lock rowOperationsLock = new ReentrantLock(); + + /** + * {@link RowId}s for which the corresponding rows were sent out of order (relative to the order in which this + * snapshot sends rows), hence they must be skipped when sending rows normally. + */ + private final Set<RowId> overwrittenRowIds = new ConcurrentHashSet<>(); + + // TODO: IGNITE-17935 - manage queue size + /** + * Rows that need to be sent out of order (relative to the order in which this snapshot sends rows). + * Versions inside rows are in oldest-to-newest order. + */ + private final Queue<SnapshotMvDataResponse.ResponseEntry> outOfOrderMvData = new LinkedList<>(); + + /** + * {@link RowId} used to point (most of the time) to the last processed row. More precisely: + * + * <ul> + * <li>Before we started to read from the partition, this is equal to lowest theoretically possible + * {@link RowId} for this partition</li> + * <li>If we started to read from partition AND it is not yet exhausted, this is the last RowId that was + * sent in this snapshot order</li> + * <li>After we exhausted the partition, this is {@code null}</li> + * </ul> + */ + private RowId lastRowId; + + private boolean startedToReadPartition = false; + + /** + * This becomes {@code true} as soon as we exhaust both the partition and out-of-order queue. + */ + private boolean finished = false; + + /** + * Creates a new instance. + */ + public OutgoingSnapshot(UUID id, PartitionAccess partition, OutgoingSnapshotRegistry outgoingSnapshotRegistry) { + this.id = id; + this.partition = partition; + this.outgoingSnapshotRegistry = outgoingSnapshotRegistry; + + lastRowId = partition.minRowId(); + } + + /** + * Returns the ID of this snapshot. + */ + public UUID id() { + return id; + } + + /** + * Returns the key of the corresponding partition. + * + * @return Partition key. + */ + public PartitionKey partitionKey() { + return partition.key(); + } + /** * Reads a snapshot meta and returns a future with the response. * * @param metaRequest Meta request. */ CompletableFuture<SnapshotMetaResponse> handleSnapshotMetaRequest(SnapshotMetaRequest metaRequest) { - //TODO https://issues.apache.org/jira/browse/IGNITE-17262 + //TODO https://issues.apache.org/jira/browse/IGNITE-17935 return null; } /** * Reads chunk of partition data and returns a future with the response. * - * @param mvDataRequest Data request. + * @param request Data request. */ - CompletableFuture<SnapshotMvDataResponse> handleSnapshotMvDataRequest(SnapshotMvDataRequest mvDataRequest) { - //TODO https://issues.apache.org/jira/browse/IGNITE-17262 - return null; + CompletableFuture<SnapshotMvDataResponse> handleSnapshotMvDataRequest(SnapshotMvDataRequest request) { + // TODO: IGNITE-17935 - executor? + + assert !finished; + + long totalBatchSize = 0; + List<SnapshotMvDataResponse.ResponseEntry> batch = new ArrayList<>(); + + while (true) { + acquireLock(); + + try { + totalBatchSize = fillWithOutOfOrderRows(batch, totalBatchSize, request); + + totalBatchSize = tryProcessRowFromPartition(batch, totalBatchSize, request); + + if (exhaustedPartition() && outOfOrderMvData.isEmpty()) { + finished = true; + } + + if (finished || batchIsFull(request, totalBatchSize)) { + break; + } + } finally { + releaseLock(); + } + } + + // We unregister itself outside the lock to avoid a deadlock, because SnapshotAwareMvPartitionStorage takes + // locks in partitionSnapshots.lock -> snapshot.lock; if we did it under lock, we would take locks in the + // opposite order. That's why we need finished flag and cooperation with SnapshotAwareMvPartitionStorage + // (calling our isFinished()). + if (finished) { + outgoingSnapshotRegistry.unregisterOutgoingSnapshot(id); + } + + SnapshotMvDataResponse response = messagesFactory.snapshotMvDataResponse() + .rows(batch) + .finish(finished) + .build(); + return CompletableFuture.completedFuture(response); + } + + private long fillWithOutOfOrderRows( + List<SnapshotMvDataResponse.ResponseEntry> rowEntries, + long totalBytesBefore, + SnapshotMvDataRequest request + ) { + long totalBytesAfter = totalBytesBefore; + + while (totalBytesAfter < request.batchSizeHint()) { + SnapshotMvDataResponse.ResponseEntry rowEntry = outOfOrderMvData.poll(); + + if (rowEntry == null) { + break; + } + + rowEntries.add(rowEntry); + totalBytesAfter += rowSizeInBytes(rowEntry.rowVersions()); + } + + return totalBytesAfter; + } + + private long rowSizeInBytes(List<ByteBuffer> rowVersions) { + long sum = 0; + + for (ByteBuffer buf : rowVersions) { + if (buf != null) { + sum += buf.limit(); + } + } + + return sum; + } + + private long tryProcessRowFromPartition(List<SnapshotMvDataResponse.ResponseEntry> batch, long totalBatchSize, + SnapshotMvDataRequest request) { + if (batchIsFull(request, totalBatchSize) || exhaustedPartition()) { + return totalBatchSize; + } + + if (!startedToReadPartition) { + lastRowId = partition.closestRowId(lastRowId); + + startedToReadPartition = true; + } else { + lastRowId = partition.closestRowId(lastRowId.increment()); + } + + if (!exhaustedPartition()) { + if (!overwrittenRowIds.remove(lastRowId)) { + List<ReadResult> rowVersions = new ArrayList<>(partition.rowVersions(lastRowId)); + Collections.reverse(rowVersions); + SnapshotMvDataResponse.ResponseEntry rowEntry = rowEntry(lastRowId, rowVersions); + + batch.add(rowEntry); + + totalBatchSize += rowSizeInBytes(rowEntry.rowVersions()); + } + } + + return totalBatchSize; + } + + private boolean batchIsFull(SnapshotMvDataRequest request, long totalBatchSize) { + return totalBatchSize >= request.batchSizeHint(); + } + + private boolean exhaustedPartition() { + return lastRowId == null; + } + + private SnapshotMvDataResponse.ResponseEntry rowEntry(RowId rowId, List<ReadResult> rowVersions) { + List<ByteBuffer> buffers = new ArrayList<>(); + List<HybridTimestamp> commitTimestamps = new ArrayList<>(); + UUID transactionId = null; + UUID commitTableId = null; + int commitPartitionId = ReadResult.UNDEFINED_COMMIT_PARTITION_ID; + + for (ReadResult version : rowVersions) { Review Comment: Ok, I have a complaint. You instantiate a new collection, reverse it and then only use it for iteration. Why do we waste objects? Can we just iterate it backwards? It's fine to assert that list is `RandomAccess` if we need to be 100% sure. ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/PartitionsSnapshots.java: ########## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing; + +import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey; + +/** + * Allows to obtain {@link PartitionSnapshots} instances. + */ +public interface PartitionsSnapshots { Review Comment: Don't you have a feeling that there are too many interfaces? ########## modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTest.java: ########## @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.ignite.hlc.HybridClock; +import org.apache.ignite.internal.schema.ByteBufferRow; +import org.apache.ignite.internal.storage.ReadResult; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.table.distributed.TableMessagesFactory; +import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess; +import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey; +import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataRequest; +import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class OutgoingSnapshotTest { + @Mock + private PartitionAccess partitionAccess; + + @Mock + private OutgoingSnapshotRegistry snapshotRegistry; + + private OutgoingSnapshot snapshot; + + private final TableMessagesFactory messagesFactory = new TableMessagesFactory(); + + private final RowId lowestRowId = RowId.lowestRowId(1); + private final RowId rowId1 = Objects.requireNonNull(lowestRowId.increment()); + private final RowId rowId2 = Objects.requireNonNull(rowId1.increment()); + private final RowId rowId3 = Objects.requireNonNull(rowId2.increment()); + + private final RowId rowIdOutOfOrder; + + { Review Comment: Why don't you use setUp method? Oh wait, you already have `@BeforeEach`, why did you split it in two sections then? ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java: ########## @@ -141,8 +183,50 @@ private CompletableFuture<Void> respond( NetworkAddress sender, Long correlationId ) { - //TODO https://issues.apache.org/jira/browse/IGNITE-17262 + //TODO https://issues.apache.org/jira/browse/IGNITE-17935 // Handle offline sender and stopped manager. return messagingService.respond(sender, response, correlationId); } + + @Override + public PartitionSnapshots partitionSnapshots(PartitionKey partitionKey) { + return getPartitionSnapshots(partitionKey); + } + + private static class PartitionSnapshotsImpl implements PartitionSnapshots { + private final List<OutgoingSnapshot> snapshots = new ArrayList<>(); + + private final List<OutgoingSnapshot> unmodifiableSnapshotsView = unmodifiableList(snapshots); + + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final ReusableLockLockup readLockLockup = new ReusableLockLockup(lock.readLock()); + + private void addUnderLock(OutgoingSnapshot snapshot) { + lock.writeLock().lock(); + + try { + snapshots.add(snapshot); + } finally { + lock.writeLock().unlock(); + } + } + + private void removeUnderLock(OutgoingSnapshot snapshot) { + try { + snapshots.remove(snapshot); + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public AutoLockup acquireReadLock() { + return readLockLockup.acquireLock(); + } + + @Override + public List<OutgoingSnapshot> ongoingSnapshots() { + return unmodifiableSnapshotsView; + } Review Comment: What's the point of extra field? Are we this paranoid? This is extra code that exists for no good reason, in my opinion. Can be safely replaced with `List<? extends OutgoingSnapshot>`, has the same effect if its caller doesn't use raw types ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java: ########## @@ -17,36 +17,269 @@ package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.ignite.hlc.HybridTimestamp; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.storage.ReadResult; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.table.distributed.TableMessagesFactory; +import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess; +import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaRequest; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaResponse; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataRequest; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse; +import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet; /** - * Outgoing snapshot. + * Outgoing snapshot. It corresponds to exactly one partition. + * + * <p>The snapshot has a lock needed for interaction with {@link SnapshotAwareMvPartitionStorage}. */ public class OutgoingSnapshot { + private static final TableMessagesFactory messagesFactory = new TableMessagesFactory(); + + private final UUID id; + + private final PartitionAccess partition; + + private final OutgoingSnapshotRegistry outgoingSnapshotRegistry; + + /** + * Lock that is used for mutual exclusion of snapshot reading (by this class) and threads that write to the same + * partition (currently, via {@link SnapshotAwareMvPartitionStorage}). + */ + private final Lock rowOperationsLock = new ReentrantLock(); + + /** + * {@link RowId}s for which the corresponding rows were sent out of order (relative to the order in which this + * snapshot sends rows), hence they must be skipped when sending rows normally. + */ + private final Set<RowId> overwrittenRowIds = new ConcurrentHashSet<>(); + + // TODO: IGNITE-17935 - manage queue size + /** + * Rows that need to be sent out of order (relative to the order in which this snapshot sends rows). + * Versions inside rows are in oldest-to-newest order. + */ + private final Queue<SnapshotMvDataResponse.ResponseEntry> outOfOrderMvData = new LinkedList<>(); + + /** + * {@link RowId} used to point (most of the time) to the last processed row. More precisely: + * + * <ul> + * <li>Before we started to read from the partition, this is equal to lowest theoretically possible + * {@link RowId} for this partition</li> + * <li>If we started to read from partition AND it is not yet exhausted, this is the last RowId that was + * sent in this snapshot order</li> + * <li>After we exhausted the partition, this is {@code null}</li> + * </ul> + */ + private RowId lastRowId; + + private boolean startedToReadPartition = false; + + /** + * This becomes {@code true} as soon as we exhaust both the partition and out-of-order queue. + */ + private boolean finished = false; + + /** + * Creates a new instance. + */ + public OutgoingSnapshot(UUID id, PartitionAccess partition, OutgoingSnapshotRegistry outgoingSnapshotRegistry) { + this.id = id; + this.partition = partition; + this.outgoingSnapshotRegistry = outgoingSnapshotRegistry; + + lastRowId = partition.minRowId(); + } + + /** + * Returns the ID of this snapshot. + */ + public UUID id() { + return id; + } + + /** + * Returns the key of the corresponding partition. + * + * @return Partition key. + */ + public PartitionKey partitionKey() { + return partition.key(); + } + /** * Reads a snapshot meta and returns a future with the response. * * @param metaRequest Meta request. */ CompletableFuture<SnapshotMetaResponse> handleSnapshotMetaRequest(SnapshotMetaRequest metaRequest) { - //TODO https://issues.apache.org/jira/browse/IGNITE-17262 + //TODO https://issues.apache.org/jira/browse/IGNITE-17935 return null; } /** * Reads chunk of partition data and returns a future with the response. * - * @param mvDataRequest Data request. + * @param request Data request. */ - CompletableFuture<SnapshotMvDataResponse> handleSnapshotMvDataRequest(SnapshotMvDataRequest mvDataRequest) { - //TODO https://issues.apache.org/jira/browse/IGNITE-17262 - return null; + CompletableFuture<SnapshotMvDataResponse> handleSnapshotMvDataRequest(SnapshotMvDataRequest request) { Review Comment: I agree, this operation should be put into a thread-pool. Currently it's going to be executed in... where? Some network messages processing pool, that's not a good place. ########## modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java: ########## @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.hlc.HybridClock; +import org.apache.ignite.hlc.HybridTimestamp; +import org.apache.ignite.internal.lock.AutoLockup; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.storage.ReadResult; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey; +import org.apache.ignite.internal.util.Cursor; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class SnapshotAwarePartitionDataStorageTest { + @Mock + private MvPartitionStorage partitionStorage; + + @Mock + private PartitionsSnapshots partitionsSnapshots; + + @Spy + private final PartitionKey partitionKey = new PartitionKey(UUID.randomUUID(), 1); + + @InjectMocks + private SnapshotAwarePartitionDataStorage testedStorage; + + @Mock + private PartitionSnapshots partitionSnapshots; + + private final RowId rowId = new RowId(1); Review Comment: 1 can be a constant ########## modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManagerTest.java: ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; + +import java.util.UUID; +import org.apache.ignite.internal.lock.AutoLockup; +import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey; +import org.apache.ignite.network.MessagingService; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class OutgoingSnapshotsManagerTest { Review Comment: What exactly are we testing here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
