ibessonov commented on code in PR #1246:
URL: https://github.com/apache/ignite-3/pull/1246#discussion_r1005372501


##########
modules/core/src/main/java/org/apache/ignite/internal/lock/ReusableLockLockup.java:
##########
@@ -29,15 +29,20 @@
 public class ReusableLockLockup implements AutoLockup {
     private final Lock lock;
 
-    public ReusableLockLockup(Lock lock) {
+    private ReusableLockLockup(Lock lock) {
         this.lock = lock;
     }
 
+    public static ReusableLockLockup forLock(Lock lock) {

Review Comment:
   How is this necessary? I understand the utility of `List.of` or `Map.of`, 
but here it's the same signature.
   Do we really add this method so that it would look a tiny bit prettier in, 
like, 2 places? It's still `ReusableLockLockup.forLock(lock)`, it's just as 
ugly (I mean the part with "lock(lock)" and other usages of the word)



##########
modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java:
##########
@@ -626,6 +626,19 @@ public static void closeAll(AutoCloseable... closeables) 
throws Exception {
         closeAll(Arrays.stream(closeables));
     }
 
+    /**
+     * Closes an {@link AutoCloseable} ignoring any exception it throws.
+     *
+     * @param closeable The AutoCloseable to close.
+     */
+    public static void closeQuietly(AutoCloseable closeable) {
+        try {
+            closeable.close();
+        } catch (Exception e) {

Review Comment:
   Usually we name there `ignore`, IDEA won't highlight them as problematic in 
this case. It's less irritating this way.
   Commonly used comment is `// No-op.`



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java:
##########
@@ -284,44 +292,65 @@ private SnapshotMvDataResponse.ResponseEntry 
rowEntry(RowId rowId) {
     }
 
     /**
-     * Reads chunk of TX states from partition and returns a future with the 
response.
+     * Reads a chunk of TX states from partition and returns a future with the 
response.
      *
-     * @param txDataRequest Data request.
+     * @param request Data request.
      */
-    CompletableFuture<SnapshotTxDataResponse> 
handleSnapshotTxDataRequest(SnapshotTxDataRequest txDataRequest) {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-17935
-        return null;
+    CompletableFuture<SnapshotTxDataResponse> 
handleSnapshotTxDataRequest(SnapshotTxDataRequest request) {
+        List<IgniteBiTuple<UUID, TxMeta>> rows = new ArrayList<>();
+
+        while (!finishedTxData && rows.size() < 
request.maxTransactionsInBatch()) {
+            if (txDataCursor.hasNext()) {
+                rows.add(txDataCursor.next());
+            } else {
+                finishedTxData = true;
+                closeQuietly(txDataCursor);
+            }
+        }
+
+        SnapshotTxDataResponse response = buildTxDataResponse(rows, 
finishedTxData);
+
+        return CompletableFuture.completedFuture(response);

Review Comment:
   What thread executes this method? I see no TODO like before. There was a 
separate issue to complete all TODOs, right?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/message/SnapshotTxDataRequest.java:
##########
@@ -25,4 +25,10 @@
  */
 @Transferable(TableMessageGroup.SNAPSHOT_TX_DATA_REQUEST)
 public interface SnapshotTxDataRequest extends SnapshotRequestMessage {
+    /**
+     * Returns maximum number of transactions that should be sent in response.
+     *
+     * @return Maximum number of transactions that should be sent in response.
+     */
+    int maxTransactionsInBatch();

Review Comment:
   I mentioned it last time, and again I'm having the same concern. There's no 
way that we will pass different values here in GA. Making properties like this 
one is over-engineering, and argument about unit tests is still pretty weak. 
This exists to be tested, not to be used. I don't like it.
   We can always add something like this in the future if we need it. Now we 
don't need it, can we remove both this and a batch size for MV storage?



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java:
##########
@@ -166,11 +174,11 @@ void delegatesAcquirePartitionSnapshotsReadLock() {
     }
 
     @ParameterizedTest
-    @EnumSource(WriteAction.class)
-    void 
writingNotYetPassedRowIdForFirstTimeSendsEnqueuesItOnSnapshotOutOfOrder(WriteAction
 writeAction) {
+    @EnumSource(MvWriteAction.class)
+    void 
writingNotYetPassedRowIdForFirstTimeEnqueuesItOnSnapshotOutOfOrder(MvWriteAction
 writeAction) {

Review Comment:
   Oh my god! Can you move this to the comment? I don't see a sentence here, 
all I see is a bunch of characters that form a big string. This is too much. I 
guess I didn't read these titles during the last review



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java:
##########
@@ -84,33 +92,31 @@ public void lastAppliedIndex(long lastAppliedIndex) throws 
StorageException {
     @Override
     public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, 
UUID txId, UUID commitTableId,
             int commitPartitionId) throws TxIdMismatchException, 
StorageException {
-        sendRowOutOfOrderToInterferingSnapshots(rowId);
+        sendMvRowOutOfOrderToInterferingSnapshots(rowId);
 
         return partitionStorage.addWrite(rowId, row, txId, commitTableId, 
commitPartitionId);
     }
 
     @Override
     public @Nullable BinaryRow abortWrite(RowId rowId) throws StorageException 
{
-        sendRowOutOfOrderToInterferingSnapshots(rowId);
+        sendMvRowOutOfOrderToInterferingSnapshots(rowId);
 
         return partitionStorage.abortWrite(rowId);
     }
 
     @Override
     public void commitWrite(RowId rowId, HybridTimestamp timestamp) throws 
StorageException {
-        sendRowOutOfOrderToInterferingSnapshots(rowId);
+        sendMvRowOutOfOrderToInterferingSnapshots(rowId);

Review Comment:
   This is becoming impossible to read, how are you managing this in your head?
   



##########
modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/UnsignedUuidComparator.java:
##########
@@ -15,26 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing;
+package org.apache.ignite.internal.tx.storage.state;
 
+import java.util.Comparator;
 import java.util.UUID;
 
 /**
- * Registry of {@link OutgoingSnapshot}s.
+ * {@link Comparator} for {@link UUID} instances that orders them interpreting 
them as unsigned 128-bit integers.
  */
-public interface OutgoingSnapshotRegistry {
-    /**
-     * Register a snapshot with the registry.
-     *
-     * @param snapshotId ID of the snapshot.
-     * @param outgoingSnapshot Snapshot itself.
-     */
-    void registerOutgoingSnapshot(UUID snapshotId, OutgoingSnapshot 
outgoingSnapshot);
+public class UnsignedUuidComparator implements Comparator<UUID> {
+    @Override
+    public int compare(UUID o1, UUID o2) {
+        int highHalvesComparisonResult = 
Long.compareUnsigned(o1.getMostSignificantBits(), o2.getMostSignificantBits());
 
-    /**
-     * Unregisters a snapshot with the given ID.
-     *
-     * @param snapshotId Snapshot ID.
-     */
-    void unregisterOutgoingSnapshot(UUID snapshotId);
+        if (highHalvesComparisonResult != 0) {
+            return highHalvesComparisonResult;
+        }
+
+        return Long.compare(o1.getLeastSignificantBits(), 
o2.getLeastSignificantBits());

Review Comment:
   Should be unsigned here as well



##########
modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java:
##########
@@ -85,7 +88,10 @@ public void remove(UUID txId) {
     /** {@inheritDoc} */
     @Override
     public Cursor<IgniteBiTuple<UUID, TxMeta>> scan() {
-        return Cursor.fromIterator(storage.entrySet().stream().map(e -> new 
IgniteBiTuple<>(e.getKey(), e.getValue())).iterator());
+        List<IgniteBiTuple<UUID, TxMeta>> copy = storage.entrySet().stream()
+                .map(e -> new IgniteBiTuple<>(e.getKey(), e.getValue()))
+                .collect(toList());
+        return Cursor.fromIterator(copy.iterator());

Review Comment:
   ```suggestion
                   .collect(toList());
   
           return Cursor.fromIterator(copy.iterator());
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java:
##########
@@ -133,49 +155,35 @@ CompletableFuture<SnapshotMetaResponse> 
handleSnapshotMetaRequest(SnapshotMetaRe
     }
 
     /**
-     * Reads chunk of partition data and returns a future with the response.
+     * Reads a chunk of partition data and returns a future with the response.
      *
      * @param request Data request.
      */
     CompletableFuture<SnapshotMvDataResponse> 
handleSnapshotMvDataRequest(SnapshotMvDataRequest request) {
-        // TODO: IGNITE-17935 - executor?
-
-        assert !finished;
+        assert !finishedMvData : "MV data sending has already been finished";
 
         long totalBatchSize = 0;
         List<SnapshotMvDataResponse.ResponseEntry> batch = new ArrayList<>();
 
         while (true) {
-            acquireLock();
-
-            try {
+            try (AutoLockup ignored = acquireMvLock()) {
                 totalBatchSize = fillWithOutOfOrderRows(batch, totalBatchSize, 
request);
 
                 totalBatchSize = tryProcessRowFromPartition(batch, 
totalBatchSize, request);
 
                 if (exhaustedPartition() && outOfOrderMvData.isEmpty()) {
-                    finished = true;
+                    finishedMvData = true;

Review Comment:
   Doesn't this field duplicate the behavior, stated by `exhaustedPartition()` 
method? If last row id is null, we have finished processing rows.
   Another question, is it possible that `exhaustedPartition()` is true and 
`outOfOrderMvData` is not empty? I don't think so. Something's too convoluted 
here.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java:
##########
@@ -103,17 +103,17 @@ public void stop() throws Exception {
     }
 
     /**
-     * Registers an outgoing snapshot in the manager.
+     * Starts an outgoing snapshot and registers it in the manager. This is 
the point where snapshot is 'taken',
+     * that is, the immutable scope of the snapshot (what MV data and what TX 
data belongs to it) is cut.
      *
      * @param snapshotId       Snapshot id.
      * @param outgoingSnapshot Outgoing snapshot.
      */
-    @Override
-    public void registerOutgoingSnapshot(UUID snapshotId, OutgoingSnapshot 
outgoingSnapshot) {
+    void startOutgoingSnapshot(UUID snapshotId, OutgoingSnapshot 
outgoingSnapshot) {
         snapshots.put(snapshotId, outgoingSnapshot);
 
         PartitionSnapshotsImpl partitionSnapshots = 
getPartitionSnapshots(outgoingSnapshot.partitionKey());

Review Comment:
   Why do we use the implementation type instead of the interface?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java:
##########
@@ -56,13 +56,9 @@ public OutgoingSnapshotReader(PartitionSnapshotStorage 
snapshotStorage) {
                 
.learnersList(snapshotStorage.startupSnapshotMeta().learnersList())
                 .build();
 
-        OutgoingSnapshot outgoingSnapshot = new OutgoingSnapshot(
-                id,
-                snapshotStorage.partition(),
-                snapshotStorage.outgoingSnapshotsManager()
-        );
+        OutgoingSnapshot outgoingSnapshot = new OutgoingSnapshot(id, 
snapshotStorage.partition());
 
-        
snapshotStorage.outgoingSnapshotsManager().registerOutgoingSnapshot(id, 
outgoingSnapshot);
+        snapshotStorage.outgoingSnapshotsManager().startOutgoingSnapshot(id, 
outgoingSnapshot);

Review Comment:
   How many times can we rename these methods? =)



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java:
##########
@@ -360,4 +389,15 @@ public boolean alreadyPassed(RowId rowId) {
     public void enqueueForSending(RowId rowId) {
         outOfOrderMvData.add(rowEntry(rowId));
     }
+
+    /**
+     * Closes the snapshot releasing the underlying resources.
+     */
+    public void close() {
+        Cursor<IgniteBiTuple<UUID, TxMeta>> txCursor = txDataCursor;
+
+        if (txCursor != null) {
+            closeQuietly(txCursor);
+        }

Review Comment:
   Are we seriously avoiding double volatile read here?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java:
##########
@@ -84,33 +92,31 @@ public void lastAppliedIndex(long lastAppliedIndex) throws 
StorageException {
     @Override
     public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, 
UUID txId, UUID commitTableId,
             int commitPartitionId) throws TxIdMismatchException, 
StorageException {
-        sendRowOutOfOrderToInterferingSnapshots(rowId);
+        sendMvRowOutOfOrderToInterferingSnapshots(rowId);
 
         return partitionStorage.addWrite(rowId, row, txId, commitTableId, 
commitPartitionId);
     }
 
     @Override
     public @Nullable BinaryRow abortWrite(RowId rowId) throws StorageException 
{
-        sendRowOutOfOrderToInterferingSnapshots(rowId);
+        sendMvRowOutOfOrderToInterferingSnapshots(rowId);
 
         return partitionStorage.abortWrite(rowId);
     }
 
     @Override
     public void commitWrite(RowId rowId, HybridTimestamp timestamp) throws 
StorageException {
-        sendRowOutOfOrderToInterferingSnapshots(rowId);
+        sendMvRowOutOfOrderToInterferingSnapshots(rowId);
 
         partitionStorage.commitWrite(rowId, timestamp);
     }
 
-    private void sendRowOutOfOrderToInterferingSnapshots(RowId rowId) {
+    private void sendMvRowOutOfOrderToInterferingSnapshots(RowId rowId) {
         PartitionSnapshots partitionSnapshots = 
partitionsSnapshots.partitionSnapshots(partitionKey);
 
         for (OutgoingSnapshot snapshot : 
partitionSnapshots.ongoingSnapshots()) {
-            snapshot.acquireLock();
-
-            try {
-                if (snapshot.isFinished()) {
+            try (AutoLockup ignored = snapshot.acquireMvLock()) {
+                if (snapshot.isFinishedMvData()) {

Review Comment:
   `isFinished()` check can be safely removed. `alreadyPassed()` is enough



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java:
##########
@@ -332,15 +361,15 @@ public boolean addRowIdToSkip(RowId rowId) {
 
     /**
      * Returns {@code true} if the given {@link RowId} does not interfere with 
the rows that this snapshot is going
-     * to be sent in the normal snapshot rows sending order.
+     * to send in the normal snapshot rows sending order.
      *
-     * <p>Must be called under snapshot lock.
+     * <p>Must be called under MV data snapshot lock.
      *
      * @param rowId RowId.
      * @return {@code true} if the given RowId is already passed by the 
snapshot in normal rows sending order.
      */
     public boolean alreadyPassed(RowId rowId) {
-        if (!startedToReadPartition) {
+        if (!startedToReadMvPartition) {

Review Comment:
   It would be nice to ban (MIN_VALUE, MIN_VALUE) as a valid row id in 
partitions. Then we could have removed this flag.



##########
modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/UnsignedUuidComparator.java:
##########
@@ -15,26 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing;
+package org.apache.ignite.internal.tx.storage.state;
 
+import java.util.Comparator;
 import java.util.UUID;
 
 /**
- * Registry of {@link OutgoingSnapshot}s.
+ * {@link Comparator} for {@link UUID} instances that orders them interpreting 
them as unsigned 128-bit integers.
  */
-public interface OutgoingSnapshotRegistry {
-    /**
-     * Register a snapshot with the registry.
-     *
-     * @param snapshotId ID of the snapshot.
-     * @param outgoingSnapshot Snapshot itself.
-     */
-    void registerOutgoingSnapshot(UUID snapshotId, OutgoingSnapshot 
outgoingSnapshot);
+public class UnsignedUuidComparator implements Comparator<UUID> {
+    @Override
+    public int compare(UUID o1, UUID o2) {
+        int highHalvesComparisonResult = 
Long.compareUnsigned(o1.getMostSignificantBits(), o2.getMostSignificantBits());

Review Comment:
   Usually `cmp` is enough for the name



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java:
##########
@@ -63,7 +64,7 @@ public interface TxStateStorage extends AutoCloseable {
      * @throws IgniteInternalException with {@link 
Transactions#TX_STATE_STORAGE_ERR} error code in case when
      *                                 the operation has failed.
      */
-    boolean compareAndSet(UUID txId, TxState txStateExpected, TxMeta txMeta, 
long commandIndex);
+    boolean compareAndSet(UUID txId, @Nullable TxState txStateExpected, TxMeta 
txMeta, long commandIndex);

Review Comment:
   Again, please specify the meaning of `null`. Is this documented somewhere?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java:
##########
@@ -24,13 +24,15 @@
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.tostring.S;
+import org.jetbrains.annotations.Nullable;
 
 /** Transaction meta. */
 public class TxMeta implements Serializable {
     /** Serial version UID. */
     private static final long serialVersionUID = -172513482743911860L;
 
     /** Tx state. */
+    @Nullable

Review Comment:
   What does `null` mean in this context? Is it really nullable?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java:
##########
@@ -103,17 +103,17 @@ public void stop() throws Exception {
     }
 
     /**
-     * Registers an outgoing snapshot in the manager.
+     * Starts an outgoing snapshot and registers it in the manager. This is 
the point where snapshot is 'taken',
+     * that is, the immutable scope of the snapshot (what MV data and what TX 
data belongs to it) is cut.
      *
      * @param snapshotId       Snapshot id.
      * @param outgoingSnapshot Outgoing snapshot.
      */
-    @Override
-    public void registerOutgoingSnapshot(UUID snapshotId, OutgoingSnapshot 
outgoingSnapshot) {
+    void startOutgoingSnapshot(UUID snapshotId, OutgoingSnapshot 
outgoingSnapshot) {
         snapshots.put(snapshotId, outgoingSnapshot);
 
         PartitionSnapshotsImpl partitionSnapshots = 
getPartitionSnapshots(outgoingSnapshot.partitionKey());

Review Comment:
   I just noticed it. Isn't it the point of having interfaces? Do we use some 
methods that don't exist in the interface?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to