rpuch commented on code in PR #1887:
URL: https://github.com/apache/ignite-3/pull/1887#discussion_r1157337143


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetPrefixCommand.java:
##########
@@ -15,20 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.metastorage.command.cursor;
+package org.apache.ignite.internal.metastorage.command;
 
-import 
org.apache.ignite.internal.metastorage.command.MetastorageCommandsMessageGroup;
-import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
- * Cursor close command for MetaStorageCommandListener that closes cursor with 
given id.
+ * Range command for MetaStorageCommandListener that retrieves entries for the 
given key prefix in lexicographic order. Entries will be
+ * filtered out by upper bound of given revision number.
  */
-@Transferable(MetastorageCommandsMessageGroup.CLOSE_CURSOR)
-public interface CloseCursorCommand extends WriteCommand {
-    /**
-     * Returns cursor id.
-     */
-    IgniteUuid cursorId();
+@Transferable(MetastorageCommandsMessageGroup.GET_PREFIX)
+public interface GetPrefixCommand extends PaginationCommand {
+    byte[] prefix();

Review Comment:
   Should we have a javadoc here?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/CursorSubscription.java:
##########
@@ -130,7 +126,9 @@ private void processRequest() {
                     demand--;
                 } else {
                     if (cachedResponse.hasNextBatch()) {
-                        requestNextBatch();
+                        byte[] lastProcessedKey = entries.get(entries.size() - 
1).key();

Review Comment:
   Is there a guarantee that `entries` is not empty here?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -483,21 +485,17 @@ public Collection<Entry> getAndPutAll(List<byte[]> keys, 
List<byte[]> values) {
         return res;
     }
 
-    /** {@inheritDoc} */
-    @NotNull
     @Override
     public Entry get(byte[] key) {
         rwLock.readLock().lock();

Review Comment:
   It seems that read lock is only needed here to make sure `rev` is read 
without a race. `doGet()` only reads `index` and `data` fields, they don't seem 
to need read lock protection. So it looks like we could replace this with just 
`doGet(key, revision());`.
   
   Is the lock taken here for consistency with other places, or for the sake of 
safety (if something changes in the future regarding the fields read), or do I 
miss something?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -509,21 +507,28 @@ public Entry get(byte[] key, long revUpperBound) {
         }
     }
 
-    /** {@inheritDoc} */
-    @NotNull
     @Override
     public Collection<Entry> getAll(List<byte[]> keys) {
-        return doGetAll(keys, LATEST_REV);
+        rwLock.readLock().lock();

Review Comment:
   Same as above



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -758,23 +757,95 @@ private void applyOperations(Collection<Operation> ops) 
throws RocksDBException
     }
 
     @Override
-    public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, boolean 
includeTombstones) {
-        return new RangeCursor(this, keyFrom, keyTo, rev, includeTombstones);
-    }
+    public Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo) {
+        rwLock.readLock().lock();
 
-    @Override
-    public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long 
revUpperBound, boolean includeTombstones) {
-        return new RangeCursor(this, keyFrom, keyTo, revUpperBound, 
includeTombstones);
+        try {
+            return range(keyFrom, keyTo, rev);
+        } finally {
+            rwLock.readLock().unlock();
+        }
     }
 
     @Override
-    public Cursor<Entry> prefix(byte[] prefix, boolean includeTombstones) {
-        return prefix(prefix, rev, includeTombstones);
-    }
+    public Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo, long 
revUpperBound) {
+        rwLock.readLock().lock();

Review Comment:
   Do we need a read lock here?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java:
##########
@@ -77,159 +67,89 @@ public void onRead(Iterator<CommandClosure<ReadCommand>> 
iter) {
 
             ReadCommand command = clo.command();
 
-            if (command instanceof GetCommand) {
-                GetCommand getCmd = (GetCommand) command;
-
-                Entry e;
-
-                if (getCmd.revision() != 0) {
-                    e = storage.get(getCmd.key(), getCmd.revision());
-                } else {
-                    e = storage.get(getCmd.key());
-                }
-
-                clo.result(e);
-            } else if (command instanceof GetAllCommand) {
-                GetAllCommand getAllCmd = (GetAllCommand) command;
-
-                Collection<Entry> entries;
-
-                if (getAllCmd.revision() != 0) {
-                    entries = storage.getAll(getAllCmd.keys(), 
getAllCmd.revision());
-                } else {
-                    entries = storage.getAll(getAllCmd.keys());
-                }
-
-                clo.result((Serializable) entries);
-            } else {
-                assert false : "Command was not found [cmd=" + command + ']';
-            }
-        }
-    }
-
-    @Override
-    public void onWrite(Iterator<CommandClosure<WriteCommand>> iter) {
-        while (iter.hasNext()) {
-            CommandClosure<WriteCommand> clo = iter.next();
-
-            if (writeHandler.handleWriteCommand(clo)) {
-                continue;
-            }
-
-            WriteCommand command = clo.command();
-
-            if (command instanceof CreateRangeCursorCommand) {
-                var rangeCmd = (CreateRangeCursorCommand) command;
-
-                IgniteUuid cursorId = rangeCmd.cursorId();
-
-                Cursor<Entry> cursor = rangeCmd.revUpperBound() != -1
-                        ? storage.range(rangeCmd.keyFrom(), rangeCmd.keyTo(), 
rangeCmd.revUpperBound(), rangeCmd.includeTombstones())
-                        : storage.range(rangeCmd.keyFrom(), rangeCmd.keyTo(), 
rangeCmd.includeTombstones());
-
-                var cursorMeta = new CursorMeta(cursor, 
rangeCmd.requesterNodeId());
-
-                cursors.put(cursorId, cursorMeta);
-
-                clo.result(cursorId);
-            } else if (command instanceof CreatePrefixCursorCommand) {
-                var prefixCmd = (CreatePrefixCursorCommand) command;
-
-                IgniteUuid cursorId = prefixCmd.cursorId();
-
-                Cursor<Entry> cursor = prefixCmd.revUpperBound() == -1
-                        ? storage.prefix(prefixCmd.prefix(), 
prefixCmd.includeTombstones())
-                        : storage.prefix(prefixCmd.prefix(), 
prefixCmd.revUpperBound(), prefixCmd.includeTombstones());
-
-                var cursorMeta = new CursorMeta(cursor, 
prefixCmd.requesterNodeId());
-
-                cursors.put(cursorId, cursorMeta);
+            try {
+                if (command instanceof GetCommand) {
+                    GetCommand getCmd = (GetCommand) command;
 
-                clo.result(cursorId);
-            } else if (command instanceof NextBatchCommand) {
-                var nextBatchCommand = (NextBatchCommand) command;
+                    Entry e;
 
-                CursorMeta cursorMeta = 
cursors.get(nextBatchCommand.cursorId());
+                    if (getCmd.revision() != 0) {

Review Comment:
   So the smallest possible revision is 1?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -483,21 +485,17 @@ public Collection<Entry> getAndPutAll(List<byte[]> keys, 
List<byte[]> values) {
         return res;
     }
 
-    /** {@inheritDoc} */
-    @NotNull
     @Override
     public Entry get(byte[] key) {
         rwLock.readLock().lock();
 
         try {
-            return doGet(key, LATEST_REV);
+            return doGet(key, rev);
         } finally {
             rwLock.readLock().unlock();
         }
     }
 
-    /** {@inheritDoc} */
-    @NotNull
     @Override
     public Entry get(byte[] key, long revUpperBound) {
         rwLock.readLock().lock();

Review Comment:
   Same question about the read lock as above



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -758,23 +757,95 @@ private void applyOperations(Collection<Operation> ops) 
throws RocksDBException
     }
 
     @Override
-    public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, boolean 
includeTombstones) {
-        return new RangeCursor(this, keyFrom, keyTo, rev, includeTombstones);
-    }
+    public Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo) {
+        rwLock.readLock().lock();
 
-    @Override
-    public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long 
revUpperBound, boolean includeTombstones) {
-        return new RangeCursor(this, keyFrom, keyTo, revUpperBound, 
includeTombstones);
+        try {
+            return range(keyFrom, keyTo, rev);
+        } finally {
+            rwLock.readLock().unlock();
+        }
     }
 
     @Override
-    public Cursor<Entry> prefix(byte[] prefix, boolean includeTombstones) {
-        return prefix(prefix, rev, includeTombstones);
-    }
+    public Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo, long 
revUpperBound) {
+        rwLock.readLock().lock();
 
-    @Override
-    public Cursor<Entry> prefix(byte[] prefix, long revUpperBound, boolean 
includeTombstones) {
-        return new RangeCursor(this, prefix, incrementPrefix(prefix), 
revUpperBound, includeTombstones);
+        try {
+            var readOpts = new ReadOptions();
+
+            var upperBound = keyTo == null ? null : new Slice(keyTo);
+
+            readOpts.setIterateUpperBound(upperBound);
+
+            RocksIterator iterator = index.newIterator(readOpts);
+
+            iterator.seek(keyFrom);
+
+            return new RocksIteratorAdapter<>(iterator) {
+                /** Cached entry used to filter "empty" values. */
+                @Nullable
+                private Entry next;
+
+                @Override
+                public boolean hasNext() {
+                    if (next != null) {
+                        return true;
+                    }
+
+                    while (next == null && super.hasNext()) {
+                        Entry nextCandidate = decodeEntry(it.key(), 
it.value());
+
+                        it.next();
+
+                        if (!nextCandidate.empty()) {
+                            next = nextCandidate;
+
+                            return true;
+                        }
+                    }
+
+                    return false;
+                }
+
+                @Override
+                public Entry next() {
+                    if (!hasNext()) {
+                        throw new NoSuchElementException();
+                    }
+
+                    Entry result = next;
+
+                    assert result != null;
+
+                    next = null;
+
+                    return result;
+                }
+
+                @Override
+                protected Entry decodeEntry(byte[] key, byte[] value) {
+                    long[] revisions = getAsLongs(value);
+
+                    long targetRevision = maxRevision(revisions, 
revUpperBound);
+
+                    if (targetRevision == -1) {
+                        return EntryImpl.empty(key);
+                    }
+
+                    return doGetValue(key, targetRevision);

Review Comment:
   No read lock is taken around `doGetValue()`, but for ordinary `get() and 
getAll()` it is taken. It does not seem to be necessary (as mentioned in the 
comments above), I'm just leaving a note here: should we have all 
`doGetValue()` consistently used under/without a lock?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java:
##########
@@ -63,7 +63,7 @@ class MetaStorageWriteHandler {
      * Tries to process a {@link WriteCommand}, returning {@code true} if the 
command has been successfully processed or {@code false} if

Review Comment:
   It does not return anything now



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -509,21 +507,28 @@ public Entry get(byte[] key, long revUpperBound) {
         }
     }
 
-    /** {@inheritDoc} */
-    @NotNull
     @Override
     public Collection<Entry> getAll(List<byte[]> keys) {
-        return doGetAll(keys, LATEST_REV);
+        rwLock.readLock().lock();
+
+        try {
+            return doGetAll(keys, rev);
+        } finally {
+            rwLock.readLock().unlock();
+        }
     }
 
-    /** {@inheritDoc} */
-    @NotNull
     @Override
     public Collection<Entry> getAll(List<byte[]> keys, long revUpperBound) {
-        return doGetAll(keys, revUpperBound);
+        rwLock.readLock().lock();

Review Comment:
   Same as above



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java:
##########
@@ -77,159 +67,89 @@ public void onRead(Iterator<CommandClosure<ReadCommand>> 
iter) {
 
             ReadCommand command = clo.command();
 
-            if (command instanceof GetCommand) {
-                GetCommand getCmd = (GetCommand) command;
-
-                Entry e;
-
-                if (getCmd.revision() != 0) {
-                    e = storage.get(getCmd.key(), getCmd.revision());
-                } else {
-                    e = storage.get(getCmd.key());
-                }
-
-                clo.result(e);
-            } else if (command instanceof GetAllCommand) {
-                GetAllCommand getAllCmd = (GetAllCommand) command;
-
-                Collection<Entry> entries;
-
-                if (getAllCmd.revision() != 0) {
-                    entries = storage.getAll(getAllCmd.keys(), 
getAllCmd.revision());
-                } else {
-                    entries = storage.getAll(getAllCmd.keys());
-                }
-
-                clo.result((Serializable) entries);
-            } else {
-                assert false : "Command was not found [cmd=" + command + ']';
-            }
-        }
-    }
-
-    @Override
-    public void onWrite(Iterator<CommandClosure<WriteCommand>> iter) {
-        while (iter.hasNext()) {
-            CommandClosure<WriteCommand> clo = iter.next();
-
-            if (writeHandler.handleWriteCommand(clo)) {
-                continue;
-            }
-
-            WriteCommand command = clo.command();
-
-            if (command instanceof CreateRangeCursorCommand) {
-                var rangeCmd = (CreateRangeCursorCommand) command;
-
-                IgniteUuid cursorId = rangeCmd.cursorId();
-
-                Cursor<Entry> cursor = rangeCmd.revUpperBound() != -1
-                        ? storage.range(rangeCmd.keyFrom(), rangeCmd.keyTo(), 
rangeCmd.revUpperBound(), rangeCmd.includeTombstones())
-                        : storage.range(rangeCmd.keyFrom(), rangeCmd.keyTo(), 
rangeCmd.includeTombstones());
-
-                var cursorMeta = new CursorMeta(cursor, 
rangeCmd.requesterNodeId());
-
-                cursors.put(cursorId, cursorMeta);
-
-                clo.result(cursorId);
-            } else if (command instanceof CreatePrefixCursorCommand) {
-                var prefixCmd = (CreatePrefixCursorCommand) command;
-
-                IgniteUuid cursorId = prefixCmd.cursorId();
-
-                Cursor<Entry> cursor = prefixCmd.revUpperBound() == -1
-                        ? storage.prefix(prefixCmd.prefix(), 
prefixCmd.includeTombstones())
-                        : storage.prefix(prefixCmd.prefix(), 
prefixCmd.revUpperBound(), prefixCmd.includeTombstones());
-
-                var cursorMeta = new CursorMeta(cursor, 
prefixCmd.requesterNodeId());
-
-                cursors.put(cursorId, cursorMeta);
+            try {
+                if (command instanceof GetCommand) {
+                    GetCommand getCmd = (GetCommand) command;
 
-                clo.result(cursorId);
-            } else if (command instanceof NextBatchCommand) {
-                var nextBatchCommand = (NextBatchCommand) command;
+                    Entry e;
 
-                CursorMeta cursorMeta = 
cursors.get(nextBatchCommand.cursorId());
+                    if (getCmd.revision() != 0) {
+                        e = storage.get(getCmd.key(), getCmd.revision());
+                    } else {
+                        e = storage.get(getCmd.key());
+                    }
 
-                if (cursorMeta == null) {
-                    clo.result(new NoSuchElementException("Corresponding 
cursor on the server side is not found."));
+                    clo.result(e);
+                } else if (command instanceof GetAllCommand) {
+                    GetAllCommand getAllCmd = (GetAllCommand) command;
 
-                    return;
-                }
+                    Collection<Entry> entries;
 
-                try {
-                    var resp = new 
ArrayList<Entry>(nextBatchCommand.batchSize());
+                    if (getAllCmd.revision() != 0) {
+                        entries = storage.getAll(getAllCmd.keys(), 
getAllCmd.revision());
+                    } else {
+                        entries = storage.getAll(getAllCmd.keys());
+                    }
 
-                    Cursor<Entry> cursor = cursorMeta.cursor();
+                    clo.result((Serializable) entries);
+                } else if (command instanceof GetRangeCommand) {
+                    var rangeCmd = (GetRangeCommand) command;
 
-                    for (int i = 0; i < nextBatchCommand.batchSize() && 
cursor.hasNext(); i++) {
-                        resp.add(cursor.next());
-                    }
+                    byte[] keyFrom = rangeCmd.previousKey() == null
+                            ? rangeCmd.keyFrom()
+                            : 
requireNonNull(storage.nextKey(rangeCmd.previousKey()));
 
-                    if (!cursor.hasNext()) {
-                        closeCursor(nextBatchCommand.cursorId());
-                    }
+                    clo.result(handlePaginationCommand(keyFrom, 
rangeCmd.keyTo(), rangeCmd));
+                } else if (command instanceof GetPrefixCommand) {
+                    var prefixCmd = (GetPrefixCommand) command;
 
-                    clo.result(new BatchResponse(resp, cursor.hasNext()));
-                } catch (Exception e) {
-                    clo.result(e);
-                }
-            } else if (command instanceof CloseCursorCommand) {
-                var closeCursorCommand = (CloseCursorCommand) command;
+                    byte[] keyFrom = prefixCmd.previousKey() == null
+                            ? prefixCmd.prefix()
+                            : 
requireNonNull(storage.nextKey(prefixCmd.previousKey()));
 
-                try {
-                    closeCursor(closeCursorCommand.cursorId());
+                    byte[] keyTo = storage.nextKey(prefixCmd.prefix());
 
-                    clo.result(null);
-                } catch (Exception e) {
-                    clo.result(new MetaStorageException(CURSOR_CLOSING_ERR, 
e));
+                    clo.result(handlePaginationCommand(keyFrom, keyTo, 
prefixCmd));
+                } else {
+                    assert false : "Command was not found [cmd=" + command + 
']';
                 }
-            } else if (command instanceof CloseAllCursorsCommand) {
-                var cursorsCloseCmd = (CloseAllCursorsCommand) command;
-
-                Iterator<CursorMeta> cursorsIter = cursors.values().iterator();
+            } catch (Exception e) {
+                clo.result(e);
+            }
+        }
+    }
 
-                Exception ocurredException = null;
+    private BatchResponse handlePaginationCommand(byte[] keyFrom, byte 
@Nullable [] keyTo, PaginationCommand command) {
+        Cursor<Entry> cursor = command.revUpperBound() == -1
+                ? storage.range(keyFrom, keyTo)
+                : storage.range(keyFrom, keyTo, command.revUpperBound());
 
-                while (cursorsIter.hasNext()) {
-                    CursorMeta cursorDesc = cursorsIter.next();
+        try (cursor) {
+            var entries = new ArrayList<Entry>();
 
-                    if 
(cursorDesc.requesterNodeId().equals(cursorsCloseCmd.nodeId())) {
-                        try {
-                            cursorDesc.cursor().close();
-                        } catch (Exception e) {
-                            if (ocurredException == null) {
-                                ocurredException = e;
-                            } else {
-                                ocurredException.addSuppressed(e);
-                            }
-                        }
+            for (Entry entry : cursor) {
+                if (command.includeTombstones() || !entry.tombstone()) {
+                    entries.add(entry);
 
-                        cursorsIter.remove();
+                    if (entries.size() == command.batchSize()) {

Review Comment:
   Is it possible to send 0 as `batchSize()`? It seems that if zero gets sent, 
we'll operate in an unbounded manner.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to