sashapolo commented on a change in pull request #675:
URL: https://github.com/apache/ignite-3/pull/675#discussion_r810202885



##########
File path: 
modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/WatchCursor.java
##########
@@ -80,8 +76,12 @@
     WatchCursor(RocksDbKeyValueStorage storage, long rev, Predicate<byte[]> 
predicate) {
         this.storage = storage;
         this.predicate = predicate;
-        this.lastRetRev = rev - 1;
+
+        this.currentRevision = rev;
+
         this.nativeIterator = storage.newDataIterator(options);
+        this.nativeIterator.seek(longToBytes(rev));
+
         this.it = createIterator();

Review comment:
       There's no need for the inner iterator. You can simply write `return 
this` in the `Iterator<WatchEvent> iterator()` method and inline the `hasNext` 
and `next` methods

##########
File path: 
modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/WatchCursor.java
##########
@@ -181,58 +172,51 @@ public WatchEvent next() {
                 storage.lock().readLock().lock();
 
                 try {
-                    while (true) {
-                        if (!hasNext()) {
-                            return null;
-                        }
-
-                        var ref = new Object() {
-                            boolean noItemsInRevision = true;
-                        };
+                    if (!hasNext()) {
+                        return null;
+                    }
 
-                        List<EntryEvent> evts = new ArrayList<>();
+                    List<EntryEvent> evts = new ArrayList<>();
 
-                        // Iterate over the keys of the current revision and 
get all matching entries.
-                        RocksUtils.forEach(nativeIterator, (k, v) -> {
-                            ref.noItemsInRevision = false;
+                    // Iterate over the keys of the current revision and get 
all matching entries.
+                    for (; nativeIterator.isValid(); nativeIterator.next()) {
+                        byte[] rocksKey = nativeIterator.key();
+                        byte[] rocksValue = nativeIterator.value();
 
-                            byte[] key = rocksKeyToBytes(k);
+                        long revision = revisionFromRocksKey(rocksKey);
 
-                            Value val = bytesToValue(v);
+                        if (revision > currentRevision) {
+                            // There are no more keys for the current revision
+                            break;
+                        }
 
-                            if (predicate.test(key)) {
-                                Entry newEntry;
+                        byte[] key = rocksKeyToBytes(rocksKey);
 
-                                if (val.tombstone()) {
-                                    newEntry = Entry.tombstone(key, 
nextRetRev, val.updateCounter());
-                                } else {
-                                    newEntry = new Entry(key, val.bytes(), 
nextRetRev, val.updateCounter());
-                                }
+                        Value val = bytesToValue(rocksValue);

Review comment:
       This declaration can be moved inside `if` below

##########
File path: 
modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
##########
@@ -2167,6 +2203,63 @@ public void watchCursorForKey() {
         assertFalse(it.hasNext());
     }
 
+    @Test
+    public void watchCursorForKeySkipNonMatchingEntries() {
+        byte[] key1 = key(1);
+        final byte[] val1_1 = keyValue(1, 11);
+        final byte[] val1_2 = keyValue(1, 12);
+
+        final byte[] key2 = key(2);
+        final byte[] val2 = keyValue(2, 21);
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+
+        Cursor<WatchEvent> cur = storage.watch(key2, 1);
+
+        Iterator<WatchEvent> it = cur.iterator();

Review comment:
       you don't need this variable, `cur` already implements `Iterator`

##########
File path: 
modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/WatchCursor.java
##########
@@ -181,58 +172,51 @@ public WatchEvent next() {
                 storage.lock().readLock().lock();
 
                 try {
-                    while (true) {
-                        if (!hasNext()) {
-                            return null;
-                        }
-
-                        var ref = new Object() {
-                            boolean noItemsInRevision = true;
-                        };
+                    if (!hasNext()) {
+                        return null;

Review comment:
       returning `null` in `next()` breaks the `Iterator` contract, it must 
throw `NoSuchElementException` instead

##########
File path: 
modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/WatchCursor.java
##########
@@ -125,50 +125,41 @@ public boolean hasNext() {
                 storage.lock().readLock().lock();
 
                 try {
-                    if (nextRetRev != -1) {
-                        // Next revision is already calculated and is not -1, 
meaning that there is a set of keys
-                        // matching the revision and the predicate.
+                    if (currentHasNext) {
                         return true;
                     }
 
-                    while (true) {
-                        long curRev = lastRetRev + 1;
-
-                        byte[] revisionPrefix = longToBytes(curRev);
-
-                        boolean empty = true;
+                    if (!nativeIterator.isValid()) {
+                        try {
+                            nativeIterator.refresh();
 
-                        if (!nativeIterator.isValid()) {
-                            try {
-                                nativeIterator.refresh();
-                            } catch (RocksDBException e) {
-                                throw new IgniteInternalException(e);
-                            }
+                            nativeIterator.seek(longToBytes(currentRevision));
+                        } catch (RocksDBException e) {
+                            throw new IgniteInternalException(e);
                         }
+                    }
 
-                        // Check all keys by the revision to see if any one of 
them match the predicate.
-                        for (nativeIterator.seek(revisionPrefix); 
nativeIterator.isValid(); nativeIterator.next()) {
-                            empty = false;
+                    // Check all keys to see if any one of them match the 
predicate.
+                    for (; nativeIterator.isValid(); nativeIterator.next()) {
+                        byte[] rocksKey = nativeIterator.key();
 
-                            byte[] key = rocksKeyToBytes(nativeIterator.key());
+                        byte[] key = rocksKeyToBytes(rocksKey);
 
-                            if (predicate.test(key)) {
-                                // Current revision matches.
-                                nextRetRev = curRev;
+                        if (predicate.test(key)) {
+                            checkIterator(nativeIterator);

Review comment:
       there's no need to call `checkIterator` here, this method must only be 
called of `isValid` returns `false`

##########
File path: 
modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/WatchCursor.java
##########
@@ -125,50 +125,41 @@ public boolean hasNext() {
                 storage.lock().readLock().lock();
 
                 try {
-                    if (nextRetRev != -1) {
-                        // Next revision is already calculated and is not -1, 
meaning that there is a set of keys
-                        // matching the revision and the predicate.
+                    if (currentHasNext) {
                         return true;
                     }
 
-                    while (true) {
-                        long curRev = lastRetRev + 1;
-
-                        byte[] revisionPrefix = longToBytes(curRev);
-
-                        boolean empty = true;
+                    if (!nativeIterator.isValid()) {
+                        try {
+                            nativeIterator.refresh();
 
-                        if (!nativeIterator.isValid()) {
-                            try {
-                                nativeIterator.refresh();
-                            } catch (RocksDBException e) {
-                                throw new IgniteInternalException(e);
-                            }
+                            nativeIterator.seek(longToBytes(currentRevision));
+                        } catch (RocksDBException e) {
+                            throw new IgniteInternalException(e);
                         }
+                    }
 
-                        // Check all keys by the revision to see if any one of 
them match the predicate.
-                        for (nativeIterator.seek(revisionPrefix); 
nativeIterator.isValid(); nativeIterator.next()) {
-                            empty = false;
+                    // Check all keys to see if any one of them match the 
predicate.
+                    for (; nativeIterator.isValid(); nativeIterator.next()) {

Review comment:
       I suggest using `RocksUtils.find` method here

##########
File path: 
modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
##########
@@ -1951,11 +1950,48 @@ public void rangeCursor() {
 
             fail();
         } catch (NoSuchElementException e) {
-            System.out.println();
             // No-op.
         }
     }
 
+    @Test
+    public void watchCursorLexicographicTest() {
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+
+        byte[] key = key(0);
+        byte[] val = keyValue(0, 0);
+
+        int count = 1000; // Exceeds 1 byte
+
+        for (int i = 0; i < count; i++) {
+            storage.put(key, val);
+        }
+
+        assertEquals(count, storage.revision());
+        assertEquals(count, storage.updateCounter());
+
+        Cursor<WatchEvent> cur = storage.watch(key, 1);
+
+        Iterator<WatchEvent> it = cur.iterator();
+
+        int i = 1;
+
+        while (it.hasNext()) {

Review comment:
       Should you also check that you receive exactly `1000` events?

##########
File path: 
modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
##########
@@ -1951,11 +1950,48 @@ public void rangeCursor() {
 
             fail();
         } catch (NoSuchElementException e) {
-            System.out.println();
             // No-op.
         }
     }
 
+    @Test
+    public void watchCursorLexicographicTest() {
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+
+        byte[] key = key(0);
+        byte[] val = keyValue(0, 0);
+
+        int count = 1000; // Exceeds 1 byte
+
+        for (int i = 0; i < count; i++) {
+            storage.put(key, val);
+        }
+
+        assertEquals(count, storage.revision());
+        assertEquals(count, storage.updateCounter());
+
+        Cursor<WatchEvent> cur = storage.watch(key, 1);
+
+        Iterator<WatchEvent> it = cur.iterator();

Review comment:
       1. `Cursor` implements `Iterable`, you can use `for-each` with it.
   2. `Cursor` must be used inside try-with-resources

##########
File path: 
modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/WatchCursor.java
##########
@@ -125,50 +125,41 @@ public boolean hasNext() {
                 storage.lock().readLock().lock();
 
                 try {
-                    if (nextRetRev != -1) {
-                        // Next revision is already calculated and is not -1, 
meaning that there is a set of keys
-                        // matching the revision and the predicate.
+                    if (currentHasNext) {
                         return true;
                     }
 
-                    while (true) {
-                        long curRev = lastRetRev + 1;
-
-                        byte[] revisionPrefix = longToBytes(curRev);
-
-                        boolean empty = true;
+                    if (!nativeIterator.isValid()) {
+                        try {
+                            nativeIterator.refresh();

Review comment:
       Why do you call `refresh` if iterator is not valid? Can you point me to 
the documentation about that?

##########
File path: 
modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/WatchCursor.java
##########
@@ -181,58 +172,51 @@ public WatchEvent next() {
                 storage.lock().readLock().lock();
 
                 try {
-                    while (true) {
-                        if (!hasNext()) {
-                            return null;
-                        }
-
-                        var ref = new Object() {
-                            boolean noItemsInRevision = true;
-                        };
+                    if (!hasNext()) {
+                        return null;
+                    }
 
-                        List<EntryEvent> evts = new ArrayList<>();
+                    List<EntryEvent> evts = new ArrayList<>();
 
-                        // Iterate over the keys of the current revision and 
get all matching entries.
-                        RocksUtils.forEach(nativeIterator, (k, v) -> {
-                            ref.noItemsInRevision = false;
+                    // Iterate over the keys of the current revision and get 
all matching entries.
+                    for (; nativeIterator.isValid(); nativeIterator.next()) {
+                        byte[] rocksKey = nativeIterator.key();
+                        byte[] rocksValue = nativeIterator.value();
 
-                            byte[] key = rocksKeyToBytes(k);
+                        long revision = revisionFromRocksKey(rocksKey);
 
-                            Value val = bytesToValue(v);
+                        if (revision > currentRevision) {
+                            // There are no more keys for the current revision

Review comment:
       Should we save this revision, like `currentRevision = revision`?

##########
File path: 
modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
##########
@@ -2167,6 +2203,63 @@ public void watchCursorForKey() {
         assertFalse(it.hasNext());
     }
 
+    @Test
+    public void watchCursorForKeySkipNonMatchingEntries() {
+        byte[] key1 = key(1);
+        final byte[] val1_1 = keyValue(1, 11);

Review comment:
       we do not use `final` on local variables

##########
File path: 
modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
##########
@@ -2167,6 +2203,63 @@ public void watchCursorForKey() {
         assertFalse(it.hasNext());
     }
 
+    @Test
+    public void watchCursorForKeySkipNonMatchingEntries() {
+        byte[] key1 = key(1);
+        final byte[] val1_1 = keyValue(1, 11);
+        final byte[] val1_2 = keyValue(1, 12);
+
+        final byte[] key2 = key(2);
+        final byte[] val2 = keyValue(2, 21);
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+
+        Cursor<WatchEvent> cur = storage.watch(key2, 1);

Review comment:
       Same here about `try-with-resources`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to