AMashenkov commented on code in PR #1205:
URL: https://github.com/apache/ignite-3/pull/1205#discussion_r1012636163


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -569,6 +584,184 @@ private CompletableFuture<ArrayList<BinaryRow>> 
processScanRetrieveBatchAction(R
         });
     }
 
+    /**
+     * Scans sorted index in RW tx.
+     *
+     * @param request Index scan request.
+     * @param indexStorage Index storage.
+     * @return Opreation future.
+     */
+    private CompletableFuture<List<BinaryRow>> 
scanSortedIndex(ReadWriteScanRetrieveBatchReplicaRequest request,
+            SortedIndexStorage indexStorage) {
+        UUID txId = request.transactionId();
+        int batchCount = request.batchSize();
+
+        IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
+
+        UUID indexId = request.indexToUse();
+
+        BinaryTuple lowerBound = request.lowerBound();
+        BinaryTuple upperBound = request.upperBound();
+
+        int flags = request.flags();
+
+        boolean includeUpperBound = (flags & SortedIndexStorage.LESS_OR_EQUAL) 
!= 0;
+
+        return lockManager.acquire(txId, new LockKey(indexId), 
LockMode.IS).thenCompose(idxLock -> { // Index IS lock
+            return lockManager.acquire(txId, new LockKey(tableId), 
LockMode.IS).thenCompose(tblLock -> { // Table IS lock
+                @SuppressWarnings("resource") Cursor<IndexRow> cursor = 
(Cursor<IndexRow>) cursors.computeIfAbsent(cursorId,
+                        id -> {
+                            // TODO 
https://issues.apache.org/jira/browse/IGNITE-18057
+                            // Fix scan cursor return item closet to 
lowerbound and <= lowerbound
+                            // to correctly lock range between lowerbound 
value and the item next to lowerbound.
+                            return indexStorage.scan(
+                                    lowerBound == null ? null : 
BinaryTuplePrefix.fromBinaryTuple(lowerBound),
+                                    // We need upperBound next value for 
correct range lock.
+                                    null, 
//BinaryTuplePrefix.fromBinaryTuple(upperBound),
+                                    flags
+                            );
+                        });
+
+                IndexLocker indexLocker = indexesLockers.get().get(indexId);
+
+                final ArrayList<BinaryRow> result = new 
ArrayList<>(batchCount);
+
+                return continueIndexScan(txId, indexId, indexLocker, cursor, 
upperBound, includeUpperBound, batchCount, result)
+                        .thenApply(ignore -> result);
+            });
+        });
+    }
+
+    /**
+     * Scans sorted index in RO tx.
+     *
+     * @param request Index scan request.
+     * @param indexStorage Index storage.
+     * @return Opreation future.
+     */
+    private CompletableFuture<List<BinaryRow>> scanSortedIndex(
+            ReadOnlyScanRetrieveBatchReplicaRequest request,
+            SortedIndexStorage indexStorage
+    ) {
+        UUID txId = request.transactionId();
+        int batchCount = request.batchSize();
+        HybridTimestamp timestamp = request.readTimestamp();
+
+        IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
+
+        BinaryTuple lowerBound = request.lowerBound();
+        BinaryTuple upperBound = request.upperBound();
+
+        int flags = request.flags();
+
+        @SuppressWarnings("resource") Cursor<IndexRow> cursor = 
(Cursor<IndexRow>) cursors.computeIfAbsent(cursorId,
+                id -> {
+                    return indexStorage.scan(
+                            lowerBound == null ? null : 
BinaryTuplePrefix.fromBinaryTuple(lowerBound),
+                            upperBound == null ? null : 
BinaryTuplePrefix.fromBinaryTuple(upperBound),
+                            flags
+                    );
+                });
+
+        final ArrayList<BinaryRow> result = new ArrayList<>(batchCount);
+
+        return continueReadOnlyIndexScan(cursor, timestamp, batchCount, result)
+                .thenCompose(ignore -> 
CompletableFuture.completedFuture(result));
+    }
+
+    CompletableFuture<Void> continueReadOnlyIndexScan(
+            Cursor<IndexRow> cursor,
+            HybridTimestamp timestamp,
+            int batchSize,
+            List<BinaryRow> result
+    ) {
+        if (result.size() >= batchSize || !cursor.hasNext()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        IndexRow indexRow = cursor.next();
+
+        RowId rowId = indexRow.rowId();
+
+        ReadResult readResult = mvDataStorage.read(rowId, 
HybridTimestamp.MAX_VALUE);
+
+        return resolveReadResult(readResult, timestamp, () -> {
+            if (readResult.newestCommitTimestamp() == null) {
+                return null;
+            }
+
+            ReadResult committedReadResult = mvDataStorage.read(rowId, 
readResult.newestCommitTimestamp());
+
+            assert !committedReadResult.isWriteIntent() :
+                    "The result is not committed [rowId=" + rowId + ", 
timestamp="
+                            + readResult.newestCommitTimestamp() + ']';
+
+            return committedReadResult.binaryRow();
+        }).thenCompose(resolvedReadResult -> {
+            if (resolvedReadResult != null) {
+                result.add(resolvedReadResult);
+            }
+            return continueReadOnlyIndexScan(cursor, timestamp, batchSize, 
result);

Review Comment:
   I've checked. Java doesn't optimize tail recursion.
   So, wrap recursive call into supplyAsync



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to