tkalkirill commented on code in PR #2161:
URL: https://github.com/apache/ignite-3/pull/2161#discussion_r1222612347


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemoryIndexStorage.java:
##########
@@ -192,6 +196,123 @@ public void finishCleanup() {
         state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE);
     }
 
+    /** Constant that represents the absence of value in {@link ScanCursor}. */
+    private static final IndexRowKey NO_INDEX_ROW = () -> null;

Review Comment:
   Optional: What about `NULL_INDEX_ROW`? seems more obvious.



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemoryIndexStorage.java:
##########
@@ -192,6 +196,123 @@ public void finishCleanup() {
         state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE);
     }
 
+    /** Constant that represents the absence of value in {@link ScanCursor}. */
+    private static final IndexRowKey NO_INDEX_ROW = () -> null;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     *
+     * @param <R> Type of the returned value.
+     */
+    protected abstract class ScanCursor<R> implements PeekCursor<R> {
+        private final BplusTree<K, V> indexTree;
+
+        @Nullable
+        private Boolean hasNext;
+
+        @Nullable
+        private final K lower;

Review Comment:
   ```suggestion
           private final @Nullable K lower;
   ```



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemoryIndexStorage.java:
##########
@@ -192,6 +196,123 @@ public void finishCleanup() {
         state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE);
     }
 
+    /** Constant that represents the absence of value in {@link ScanCursor}. */
+    private static final IndexRowKey NO_INDEX_ROW = () -> null;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     *
+     * @param <R> Type of the returned value.
+     */
+    protected abstract class ScanCursor<R> implements PeekCursor<R> {
+        private final BplusTree<K, V> indexTree;
+
+        @Nullable
+        private Boolean hasNext;

Review Comment:
   ```suggestion
           private @Nullable Boolean hasNext;
   ```



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemoryIndexStorage.java:
##########
@@ -192,6 +196,123 @@ public void finishCleanup() {
         state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE);
     }
 
+    /** Constant that represents the absence of value in {@link ScanCursor}. */
+    private static final IndexRowKey NO_INDEX_ROW = () -> null;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     *
+     * @param <R> Type of the returned value.
+     */
+    protected abstract class ScanCursor<R> implements PeekCursor<R> {
+        private final BplusTree<K, V> indexTree;
+
+        @Nullable
+        private Boolean hasNext;
+
+        @Nullable
+        private final K lower;
+
+        @Nullable
+        private V treeRow;
+
+        @Nullable
+        private V peekedRow = (V) NO_INDEX_ROW;

Review Comment:
   ```suggestion
           private @Nullable V peekedRow = (V) NO_INDEX_ROW;
   ```



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemoryIndexStorage.java:
##########
@@ -192,6 +196,123 @@ public void finishCleanup() {
         state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE);
     }
 
+    /** Constant that represents the absence of value in {@link ScanCursor}. */
+    private static final IndexRowKey NO_INDEX_ROW = () -> null;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     *
+     * @param <R> Type of the returned value.
+     */
+    protected abstract class ScanCursor<R> implements PeekCursor<R> {
+        private final BplusTree<K, V> indexTree;
+
+        @Nullable
+        private Boolean hasNext;
+
+        @Nullable
+        private final K lower;
+
+        @Nullable
+        private V treeRow;

Review Comment:
   ```suggestion
           private @Nullable V treeRow;
   ```



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemoryIndexStorage.java:
##########
@@ -192,6 +196,123 @@ public void finishCleanup() {
         state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE);
     }
 
+    /** Constant that represents the absence of value in {@link ScanCursor}. */
+    private static final IndexRowKey NO_INDEX_ROW = () -> null;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     *
+     * @param <R> Type of the returned value.
+     */
+    protected abstract class ScanCursor<R> implements PeekCursor<R> {
+        private final BplusTree<K, V> indexTree;
+
+        @Nullable
+        private Boolean hasNext;
+
+        @Nullable
+        private final K lower;
+
+        @Nullable
+        private V treeRow;
+
+        @Nullable
+        private V peekedRow = (V) NO_INDEX_ROW;
+
+        protected ScanCursor(@Nullable K lower, BplusTree<K, V> indexTree) {
+            this.lower = lower;
+            this.indexTree = indexTree;
+        }
+
+        /**
+         * Maps value from the index tree into the required result.
+         */
+        protected abstract R map(V value);
+
+        /**
+         * Check whether the passed value exceeds the upper bound for the scan.
+         */
+        protected abstract boolean halt(V value);

Review Comment:
   Not an obvious name, maybe call it `exceedUppderBoun`?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemoryIndexStorage.java:
##########
@@ -192,6 +196,123 @@ public void finishCleanup() {
         state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE);
     }
 
+    /** Constant that represents the absence of value in {@link ScanCursor}. */
+    private static final IndexRowKey NO_INDEX_ROW = () -> null;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     *
+     * @param <R> Type of the returned value.
+     */
+    protected abstract class ScanCursor<R> implements PeekCursor<R> {
+        private final BplusTree<K, V> indexTree;
+
+        @Nullable
+        private Boolean hasNext;
+
+        @Nullable
+        private final K lower;
+
+        @Nullable
+        private V treeRow;
+
+        @Nullable
+        private V peekedRow = (V) NO_INDEX_ROW;
+
+        protected ScanCursor(@Nullable K lower, BplusTree<K, V> indexTree) {
+            this.lower = lower;
+            this.indexTree = indexTree;
+        }
+
+        /**
+         * Maps value from the index tree into the required result.
+         */
+        protected abstract R map(V value);
+
+        /**
+         * Check whether the passed value exceeds the upper bound for the scan.
+         */
+        protected abstract boolean halt(V value);
+
+        @Override
+        public void close() {
+            // No-op.
+        }
+
+        @Override
+        public boolean hasNext() {
+            return busy(() -> {
+                try {
+                    return advanceIfNeeded();
+                } catch (IgniteInternalCheckedException e) {
+                    throw new StorageException("Error while advancing the 
cursor", e);
+                }
+            });
+        }
+
+        @Override
+        public R next() {
+            return busy(() -> {
+                try {
+                    if (!advanceIfNeeded()) {
+                        throw new NoSuchElementException();
+                    }
+
+                    this.hasNext = null;
+
+                    return map(treeRow);
+                } catch (IgniteInternalCheckedException e) {
+                    throw new StorageException("Error while advancing the 
cursor", e);
+                }
+            });
+        }
+
+        @Override
+        public @Nullable R peek() {
+            return busy(() -> {
+                throwExceptionIfStorageInProgressOfRebalance(state.get(), 
AbstractPageMemoryIndexStorage.this::createStorageInfo);
+
+                try {
+                    return map(peekBusy());
+                } catch (IgniteInternalCheckedException e) {
+                    throw new StorageException("Error when peeking next 
element", e);
+                }
+            });
+        }
+
+        private @Nullable V peekBusy() throws IgniteInternalCheckedException {
+            if (hasNext != null) {
+                return treeRow;
+            }
+
+            if (treeRow == null) {
+                peekedRow = lower == null ? indexTree.findFirst() : 
indexTree.findNext(lower, true);
+            } else {
+                peekedRow = indexTree.findNext(treeRow, false);
+            }
+
+            if (peekedRow != null && halt(peekedRow)) {
+                peekedRow = null;
+            }
+
+            return peekedRow;
+        }
+
+        private boolean advanceIfNeeded() throws 
IgniteInternalCheckedException {

Review Comment:
   ```suggestion
           private boolean advanceIfNeededBusy() throws 
IgniteInternalCheckedException {
   ```



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemoryIndexStorage.java:
##########
@@ -192,6 +196,123 @@ public void finishCleanup() {
         state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE);
     }
 
+    /** Constant that represents the absence of value in {@link ScanCursor}. */
+    private static final IndexRowKey NO_INDEX_ROW = () -> null;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     *
+     * @param <R> Type of the returned value.
+     */
+    protected abstract class ScanCursor<R> implements PeekCursor<R> {
+        private final BplusTree<K, V> indexTree;
+
+        @Nullable
+        private Boolean hasNext;
+
+        @Nullable
+        private final K lower;
+
+        @Nullable
+        private V treeRow;

Review Comment:
   Missing javadoc



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java:
##########
@@ -201,4 +214,125 @@ String createStorageInfo() {
      * @throws RocksDBException If failed to delete data.
      */
     abstract void destroyData(WriteBatch writeBatch) throws RocksDBException;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     */
+    protected final class UpToDatePeekCursor<T> implements PeekCursor<T> {
+        private final Slice upperBoundSlice;
+        private final byte[] lowerBound;
+
+        private final ReadOptions options;
+        private final RocksIterator it;
+        private final Function<ByteBuffer, T> mapper;

Review Comment:
   https://memesmix.net/media/created/250/k5md75.jpg



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java:
##########
@@ -201,4 +214,125 @@ String createStorageInfo() {
      * @throws RocksDBException If failed to delete data.
      */
     abstract void destroyData(WriteBatch writeBatch) throws RocksDBException;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     */
+    protected final class UpToDatePeekCursor<T> implements PeekCursor<T> {
+        private final Slice upperBoundSlice;
+        private final byte[] lowerBound;
+
+        private final ReadOptions options;
+        private final RocksIterator it;
+        private final Function<ByteBuffer, T> mapper;
+
+        @Nullable
+        private Boolean hasNext;
+
+        private byte @Nullable [] key;

Review Comment:
   Missing javadoc



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java:
##########
@@ -201,4 +214,125 @@ String createStorageInfo() {
      * @throws RocksDBException If failed to delete data.
      */
     abstract void destroyData(WriteBatch writeBatch) throws RocksDBException;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     */
+    protected final class UpToDatePeekCursor<T> implements PeekCursor<T> {
+        private final Slice upperBoundSlice;
+        private final byte[] lowerBound;
+
+        private final ReadOptions options;
+        private final RocksIterator it;
+        private final Function<ByteBuffer, T> mapper;
+
+        @Nullable
+        private Boolean hasNext;

Review Comment:
   ```suggestion
           private @Nullable Boolean hasNext;
   ```



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java:
##########
@@ -201,4 +214,125 @@ String createStorageInfo() {
      * @throws RocksDBException If failed to delete data.
      */
     abstract void destroyData(WriteBatch writeBatch) throws RocksDBException;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     */
+    protected final class UpToDatePeekCursor<T> implements PeekCursor<T> {
+        private final Slice upperBoundSlice;
+        private final byte[] lowerBound;
+
+        private final ReadOptions options;
+        private final RocksIterator it;
+        private final Function<ByteBuffer, T> mapper;
+
+        @Nullable
+        private Boolean hasNext;
+
+        private byte @Nullable [] key;
+
+        private byte @Nullable [] peekedKey = BYTE_EMPTY_ARRAY;
+
+        UpToDatePeekCursor(byte[] upperBound, ColumnFamily indexCf, 
Function<ByteBuffer, T> mapper, byte[] lowerBound) {
+            this.lowerBound = lowerBound;
+            upperBoundSlice = new Slice(upperBound);
+            options = new ReadOptions().setIterateUpperBound(upperBoundSlice);
+            it = indexCf.newIterator(options);
+
+            this.mapper = mapper;
+        }
+
+        @Override
+        public void close() {
+            try {
+                closeAll(it, options, upperBoundSlice);
+            } catch (Exception e) {
+                throw new StorageException("Error closing cursor", e);
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            return busy(this::advanceIfNeeded);
+        }
+
+        @Override
+        public T next() {
+            return busy(() -> {
+                if (!advanceIfNeeded()) {
+                    throw new NoSuchElementException();
+                }
+
+                this.hasNext = null;
+
+                return 
mapper.apply(ByteBuffer.wrap(key).order(KEY_BYTE_ORDER));
+            });
+        }
+
+        @Override
+        public @Nullable T peek() {
+            return busy(() -> {
+                throwExceptionIfStorageInProgressOfRebalance(state.get(), 
AbstractRocksDbIndexStorage.this::createStorageInfo);
+
+                byte[] res = peek0();
+
+                if (res == null) {
+                    return null;
+                } else {
+                    return 
mapper.apply(ByteBuffer.wrap(res).order(KEY_BYTE_ORDER));
+                }
+            });
+        }
+
+        private byte @Nullable [] peek0() {
+            if (hasNext != null) {
+                return key;
+            }
+
+            refreshAndPrepareRocksIterator();
+
+            if (!it.isValid()) {
+                RocksUtils.checkIterator(it);
+
+                peekedKey = null;
+            } else {
+                peekedKey = it.key();
+            }
+
+            return peekedKey;
+        }
+
+        private boolean advanceIfNeeded() throws StorageException {

Review Comment:
   ```suggestion
           private boolean advanceIfNeededBusy() throws StorageException {
   ```



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java:
##########
@@ -201,4 +214,125 @@ String createStorageInfo() {
      * @throws RocksDBException If failed to delete data.
      */
     abstract void destroyData(WriteBatch writeBatch) throws RocksDBException;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     */
+    protected final class UpToDatePeekCursor<T> implements PeekCursor<T> {
+        private final Slice upperBoundSlice;
+        private final byte[] lowerBound;
+
+        private final ReadOptions options;
+        private final RocksIterator it;
+        private final Function<ByteBuffer, T> mapper;
+
+        @Nullable
+        private Boolean hasNext;
+
+        private byte @Nullable [] key;
+
+        private byte @Nullable [] peekedKey = BYTE_EMPTY_ARRAY;
+
+        UpToDatePeekCursor(byte[] upperBound, ColumnFamily indexCf, 
Function<ByteBuffer, T> mapper, byte[] lowerBound) {
+            this.lowerBound = lowerBound;
+            upperBoundSlice = new Slice(upperBound);
+            options = new ReadOptions().setIterateUpperBound(upperBoundSlice);
+            it = indexCf.newIterator(options);
+
+            this.mapper = mapper;
+        }
+
+        @Override
+        public void close() {
+            try {
+                closeAll(it, options, upperBoundSlice);
+            } catch (Exception e) {
+                throw new StorageException("Error closing cursor", e);
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            return busy(this::advanceIfNeeded);
+        }
+
+        @Override
+        public T next() {
+            return busy(() -> {
+                if (!advanceIfNeeded()) {
+                    throw new NoSuchElementException();
+                }
+
+                this.hasNext = null;
+
+                return 
mapper.apply(ByteBuffer.wrap(key).order(KEY_BYTE_ORDER));
+            });
+        }
+
+        @Override
+        public @Nullable T peek() {
+            return busy(() -> {
+                throwExceptionIfStorageInProgressOfRebalance(state.get(), 
AbstractRocksDbIndexStorage.this::createStorageInfo);
+
+                byte[] res = peek0();
+
+                if (res == null) {
+                    return null;
+                } else {
+                    return 
mapper.apply(ByteBuffer.wrap(res).order(KEY_BYTE_ORDER));
+                }
+            });
+        }
+
+        private byte @Nullable [] peek0() {
+            if (hasNext != null) {
+                return key;
+            }
+
+            refreshAndPrepareRocksIterator();
+
+            if (!it.isValid()) {
+                RocksUtils.checkIterator(it);
+
+                peekedKey = null;
+            } else {
+                peekedKey = it.key();
+            }
+
+            return peekedKey;
+        }
+
+        private boolean advanceIfNeeded() throws StorageException {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), 
AbstractRocksDbIndexStorage.this::createStorageInfo);
+
+            //noinspection ArrayEquality
+            key = (peekedKey == BYTE_EMPTY_ARRAY) ? peek0() : peekedKey;
+            peekedKey = BYTE_EMPTY_ARRAY;
+
+            hasNext = key != null;
+            return hasNext;
+        }
+
+        private void refreshAndPrepareRocksIterator() {

Review Comment:
   ```suggestion
           private void refreshAndPrepareRocksIteratorBusy() {
   ```



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemoryIndexStorage.java:
##########
@@ -192,6 +196,123 @@ public void finishCleanup() {
         state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE);
     }
 
+    /** Constant that represents the absence of value in {@link ScanCursor}. */
+    private static final IndexRowKey NO_INDEX_ROW = () -> null;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     *
+     * @param <R> Type of the returned value.
+     */
+    protected abstract class ScanCursor<R> implements PeekCursor<R> {
+        private final BplusTree<K, V> indexTree;
+
+        @Nullable
+        private Boolean hasNext;
+
+        @Nullable
+        private final K lower;
+
+        @Nullable
+        private V treeRow;
+
+        @Nullable
+        private V peekedRow = (V) NO_INDEX_ROW;

Review Comment:
   Missing javadoc



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java:
##########
@@ -201,4 +214,125 @@ String createStorageInfo() {
      * @throws RocksDBException If failed to delete data.
      */
     abstract void destroyData(WriteBatch writeBatch) throws RocksDBException;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     */
+    protected final class UpToDatePeekCursor<T> implements PeekCursor<T> {
+        private final Slice upperBoundSlice;
+        private final byte[] lowerBound;
+
+        private final ReadOptions options;
+        private final RocksIterator it;
+        private final Function<ByteBuffer, T> mapper;
+
+        @Nullable
+        private Boolean hasNext;
+
+        private byte @Nullable [] key;
+
+        private byte @Nullable [] peekedKey = BYTE_EMPTY_ARRAY;

Review Comment:
   Missing javadoc



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java:
##########
@@ -201,4 +214,125 @@ String createStorageInfo() {
      * @throws RocksDBException If failed to delete data.
      */
     abstract void destroyData(WriteBatch writeBatch) throws RocksDBException;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     */
+    protected final class UpToDatePeekCursor<T> implements PeekCursor<T> {
+        private final Slice upperBoundSlice;
+        private final byte[] lowerBound;
+
+        private final ReadOptions options;
+        private final RocksIterator it;
+        private final Function<ByteBuffer, T> mapper;
+
+        @Nullable
+        private Boolean hasNext;
+
+        private byte @Nullable [] key;
+
+        private byte @Nullable [] peekedKey = BYTE_EMPTY_ARRAY;
+
+        UpToDatePeekCursor(byte[] upperBound, ColumnFamily indexCf, 
Function<ByteBuffer, T> mapper, byte[] lowerBound) {
+            this.lowerBound = lowerBound;
+            upperBoundSlice = new Slice(upperBound);
+            options = new ReadOptions().setIterateUpperBound(upperBoundSlice);
+            it = indexCf.newIterator(options);
+
+            this.mapper = mapper;
+        }
+
+        @Override
+        public void close() {
+            try {
+                closeAll(it, options, upperBoundSlice);
+            } catch (Exception e) {
+                throw new StorageException("Error closing cursor", e);
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            return busy(this::advanceIfNeeded);
+        }
+
+        @Override
+        public T next() {
+            return busy(() -> {
+                if (!advanceIfNeeded()) {
+                    throw new NoSuchElementException();
+                }
+
+                this.hasNext = null;
+
+                return 
mapper.apply(ByteBuffer.wrap(key).order(KEY_BYTE_ORDER));
+            });
+        }
+
+        @Override
+        public @Nullable T peek() {
+            return busy(() -> {
+                throwExceptionIfStorageInProgressOfRebalance(state.get(), 
AbstractRocksDbIndexStorage.this::createStorageInfo);
+
+                byte[] res = peek0();
+
+                if (res == null) {
+                    return null;
+                } else {
+                    return 
mapper.apply(ByteBuffer.wrap(res).order(KEY_BYTE_ORDER));
+                }
+            });
+        }
+
+        private byte @Nullable [] peek0() {

Review Comment:
   ```suggestion
           private byte @Nullable [] peekBusy() {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to