sashapolo commented on a change in pull request #675:
URL: https://github.com/apache/ignite-3/pull/675#discussion_r810998790
##########
File path:
modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/WatchCursor.java
##########
@@ -80,163 +75,122 @@
WatchCursor(RocksDbKeyValueStorage storage, long rev, Predicate<byte[]>
predicate) {
this.storage = storage;
this.predicate = predicate;
- this.lastRetRev = rev - 1;
- this.nativeIterator = storage.newDataIterator(options);
- this.it = createIterator();
- }
- /** {@inheritDoc} */
- @Override
- public boolean hasNext() {
- return it.hasNext();
- }
-
- /** {@inheritDoc} */
- @Nullable
- @Override
- public WatchEvent next() {
- return it.next();
- }
+ this.currentRevision = rev;
- /** {@inheritDoc} */
- @Override
- public void close() throws Exception {
- IgniteUtils.closeAll(options, nativeIterator);
+ this.nativeIterator = storage.newDataIterator(options);
+ this.nativeIterator.seek(longToBytes(rev));
}
/** {@inheritDoc} */
- @NotNull
@Override
- public Iterator<WatchEvent> iterator() {
- return it;
- }
+ public boolean hasNext() {
+ storage.lock().readLock().lock();
- /**
- * Creates an iterator for this cursor.
- *
- * @return Iterator.
- */
- @NotNull
- private Iterator<WatchEvent> createIterator() {
- return new Iterator<>() {
- /** {@inheritDoc} */
- @Override
- public boolean hasNext() {
- storage.lock().readLock().lock();
-
- try {
- if (nextRetRev != -1) {
- // Next revision is already calculated and is not -1,
meaning that there is a set of keys
- // matching the revision and the predicate.
- return true;
- }
+ try {
+ if (currentHasNext) {
+ return true;
+ }
- while (true) {
- long curRev = lastRetRev + 1;
+ if (!nativeIterator.isValid()) {
+ nativeIterator.refresh();
- byte[] revisionPrefix = longToBytes(curRev);
+ nativeIterator.seek(longToBytes(currentRevision));
+ }
- boolean empty = true;
+ // Check all keys to see if any one of them match the predicate.
+ currentHasNext = RocksUtils.find(nativeIterator, (rocksKey, value)
-> {
+ byte[] key = rocksKeyToBytes(rocksKey);
- if (!nativeIterator.isValid()) {
- try {
- nativeIterator.refresh();
- } catch (RocksDBException e) {
- throw new IgniteInternalException(e);
- }
- }
+ if (predicate.test(key)) {
+ // We may have jumped to the next revision if there were
no matching keys in previous.
+ currentRevision = revisionFromRocksKey(rocksKey);
- // Check all keys by the revision to see if any one of
them match the predicate.
- for (nativeIterator.seek(revisionPrefix);
nativeIterator.isValid(); nativeIterator.next()) {
- empty = false;
+ return true;
+ }
- byte[] key = rocksKeyToBytes(nativeIterator.key());
+ return false;
+ });
- if (predicate.test(key)) {
- // Current revision matches.
- nextRetRev = curRev;
+ return currentHasNext;
+ } catch (RocksDBException e) {
+ throw new IgniteInternalException(e);
+ } finally {
+ storage.lock().readLock().unlock();
+ }
+ }
- return true;
- }
- }
+ /** {@inheritDoc} */
+ @Nullable
+ @Override
+ public WatchEvent next() {
+ storage.lock().readLock().lock();
- checkIterator(nativeIterator);
+ try {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
- if (empty) {
- return false;
- }
+ List<EntryEvent> evts = new ArrayList<>();
- // Go to the next revision.
- lastRetRev++;
- }
- } finally {
- storage.lock().readLock().unlock();
- }
- }
+ long lastSeenRevision = currentRevision;
- /** {@inheritDoc} */
- @Nullable
- @Override
- public WatchEvent next() {
- storage.lock().readLock().lock();
+ // Iterate over the keys of the current revision and get all
matching entries.
+ for (; nativeIterator.isValid(); nativeIterator.next()) {
+ byte[] rocksKey = nativeIterator.key();
+ byte[] rocksValue = nativeIterator.value();
- try {
- while (true) {
- if (!hasNext()) {
- return null;
- }
+ long revision = revisionFromRocksKey(rocksKey);
- var ref = new Object() {
- boolean noItemsInRevision = true;
- };
+ lastSeenRevision = revision;
- List<EntryEvent> evts = new ArrayList<>();
+ if (revision > currentRevision) {
+ // There are no more keys for the current revision
+ break;
+ }
- // Iterate over the keys of the current revision and
get all matching entries.
- RocksUtils.forEach(nativeIterator, (k, v) -> {
- ref.noItemsInRevision = false;
+ byte[] key = rocksKeyToBytes(rocksKey);
- byte[] key = rocksKeyToBytes(k);
+ if (predicate.test(key)) {
+ Value val = bytesToValue(rocksValue);
- Value val = bytesToValue(v);
+ Entry newEntry;
- if (predicate.test(key)) {
- Entry newEntry;
+ if (val.tombstone()) {
+ newEntry = Entry.tombstone(key, revision,
val.updateCounter());
+ } else {
+ newEntry = new Entry(key, val.bytes(), revision,
val.updateCounter());
+ }
- if (val.tombstone()) {
- newEntry = Entry.tombstone(key,
nextRetRev, val.updateCounter());
- } else {
- newEntry = new Entry(key, val.bytes(),
nextRetRev, val.updateCounter());
- }
+ Entry oldEntry = storage.doGet(key, revision - 1, false);
- Entry oldEntry = storage.doGet(key, nextRetRev
- 1, false);
+ evts.add(new EntryEvent(oldEntry, newEntry));
+ }
+ }
- evts.add(new EntryEvent(oldEntry, newEntry));
- }
- });
+ currentHasNext = false;
- if (ref.noItemsInRevision) {
- return null;
- }
+ // Go to the next revision
+ currentRevision = lastSeenRevision > currentRevision ?
lastSeenRevision : currentRevision + 1;
- if (evts.isEmpty()) {
- continue;
- }
+ checkIterator(nativeIterator);
- // Set the last returned revision to the current
revision's value.
- lastRetRev = nextRetRev;
+ return new WatchEvent(evts);
+ } finally {
+ storage.lock().readLock().unlock();
+ }
+ }
- // Set current revision to -1, meaning that it is not
found yet.
- nextRetRev = -1;
+ /** {@inheritDoc} */
+ @Override
+ public void close() throws Exception {
+ IgniteUtils.closeAll(options, nativeIterator);
+ }
- return new WatchEvent(evts);
- }
- } catch (RocksDBException e) {
- throw new IgniteInternalException(e);
- } finally {
- storage.lock().readLock().unlock();
- }
- }
- };
+ /** {@inheritDoc} */
+ @NotNull
Review comment:
This annotation is not needed
##########
File path:
modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/WatchCursor.java
##########
@@ -80,163 +75,122 @@
WatchCursor(RocksDbKeyValueStorage storage, long rev, Predicate<byte[]>
predicate) {
this.storage = storage;
this.predicate = predicate;
- this.lastRetRev = rev - 1;
- this.nativeIterator = storage.newDataIterator(options);
- this.it = createIterator();
- }
- /** {@inheritDoc} */
- @Override
- public boolean hasNext() {
- return it.hasNext();
- }
-
- /** {@inheritDoc} */
- @Nullable
- @Override
- public WatchEvent next() {
- return it.next();
- }
+ this.currentRevision = rev;
- /** {@inheritDoc} */
- @Override
- public void close() throws Exception {
- IgniteUtils.closeAll(options, nativeIterator);
+ this.nativeIterator = storage.newDataIterator(options);
+ this.nativeIterator.seek(longToBytes(rev));
}
/** {@inheritDoc} */
- @NotNull
@Override
- public Iterator<WatchEvent> iterator() {
- return it;
- }
+ public boolean hasNext() {
+ storage.lock().readLock().lock();
- /**
- * Creates an iterator for this cursor.
- *
- * @return Iterator.
- */
- @NotNull
- private Iterator<WatchEvent> createIterator() {
- return new Iterator<>() {
- /** {@inheritDoc} */
- @Override
- public boolean hasNext() {
- storage.lock().readLock().lock();
-
- try {
- if (nextRetRev != -1) {
- // Next revision is already calculated and is not -1,
meaning that there is a set of keys
- // matching the revision and the predicate.
- return true;
- }
+ try {
+ if (currentHasNext) {
+ return true;
+ }
- while (true) {
- long curRev = lastRetRev + 1;
+ if (!nativeIterator.isValid()) {
+ nativeIterator.refresh();
- byte[] revisionPrefix = longToBytes(curRev);
+ nativeIterator.seek(longToBytes(currentRevision));
+ }
- boolean empty = true;
+ // Check all keys to see if any one of them match the predicate.
+ currentHasNext = RocksUtils.find(nativeIterator, (rocksKey, value)
-> {
+ byte[] key = rocksKeyToBytes(rocksKey);
- if (!nativeIterator.isValid()) {
- try {
- nativeIterator.refresh();
- } catch (RocksDBException e) {
- throw new IgniteInternalException(e);
- }
- }
+ if (predicate.test(key)) {
+ // We may have jumped to the next revision if there were
no matching keys in previous.
+ currentRevision = revisionFromRocksKey(rocksKey);
- // Check all keys by the revision to see if any one of
them match the predicate.
- for (nativeIterator.seek(revisionPrefix);
nativeIterator.isValid(); nativeIterator.next()) {
- empty = false;
+ return true;
+ }
- byte[] key = rocksKeyToBytes(nativeIterator.key());
+ return false;
+ });
- if (predicate.test(key)) {
- // Current revision matches.
- nextRetRev = curRev;
+ return currentHasNext;
+ } catch (RocksDBException e) {
+ throw new IgniteInternalException(e);
+ } finally {
+ storage.lock().readLock().unlock();
+ }
+ }
- return true;
- }
- }
+ /** {@inheritDoc} */
+ @Nullable
Review comment:
This is not longer nullable
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]