sashapolo commented on code in PR #3042:
URL: https://github.com/apache/ignite-3/pull/3042#discussion_r1450576952


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java:
##########
@@ -174,7 +173,7 @@ public CompletableFuture<Void> notifyWatches(List<Entry> 
updatedEntries, HybridT
                                 CompletableFuture<Void> 
notifyUpdateRevisionFuture = notifyUpdateRevisionListeners(newRevision);
 
                                 CompletableFuture<Void> notificationFuture = 
allOf(notifyWatchesFuture, notifyUpdateRevisionFuture)
-                                        .thenComposeAsync(
+                                        .thenAcceptAsync(

Review Comment:
   you can use `thenRunAsync` instead



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -121,12 +105,17 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
     /** Prevents double stopping of the component. */
     private final AtomicBoolean isStopped = new AtomicBoolean();
 
+    private final AtomicLong appliedRevision = new AtomicLong(0L);

Review Comment:
   > primitive fields won't allow using {@code Mockito#spy(metastorage)}
   
   What do you mean?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java:
##########
@@ -105,7 +106,8 @@ public interface EntryReader {
     public WatchProcessor(String nodeName, EntryReader entryReader) {
         this.entryReader = entryReader;
 
-        this.watchExecutor = Executors.newFixedThreadPool(4, 
NamedThreadFactory.create(nodeName, "metastorage-watch-executor", LOG));
+        this.watchExecutor = Executors.newSingleThreadScheduledExecutor(

Review Comment:
   If you are going to leave this change, then the whole class should be 
refactored a bit



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java:
##########
@@ -288,23 +287,15 @@ private CompletableFuture<List<WatchAndEvents>> 
collectWatchesAndEvents(List<Ent
         }, watchExecutor);
     }
 
-    private CompletableFuture<Void> invokeOnRevisionCallback(long revision, 
HybridTimestamp time) {
+    private void invokeOnRevisionCallback(long revision, HybridTimestamp time) 
{
         try {
-            // We consciously put empty set here. Revision applied callback 
doesn't need any data.
-            var event = new WatchEvent(emptySet(), revision, time);
-
             revisionCallback.onSafeTimeAdvanced(time);
 
-            return revisionCallback.onRevisionApplied(event)
-                    .whenComplete((ignored, e) -> {
-                        if (e != null) {
-                            LOG.error("Error occurred when notifying watches", 
e);
-                        }
-                    });
+            revisionCallback.onRevisionApplied(revision);
         } catch (Throwable e) {
             LOG.error("Error occurred when notifying watches", e);

Review Comment:
   I think we can remove this catch and use `whenComplete` above to log any 
errors



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to