sashapolo commented on code in PR #3042:
URL: https://github.com/apache/ignite-3/pull/3042#discussion_r1450455297


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -121,12 +105,17 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
     /** Prevents double stopping of the component. */
     private final AtomicBoolean isStopped = new AtomicBoolean();
 
+    private final AtomicLong appliedRevision = new AtomicLong(0L);

Review Comment:
   what was wrong with `volatile long`?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -121,12 +105,17 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
     /** Prevents double stopping of the component. */
     private final AtomicBoolean isStopped = new AtomicBoolean();
 
+    private final AtomicLong appliedRevision = new AtomicLong(0L);
+
     /**
      * Future which completes when MetaStorage manager finished local 
recovery. The value of the future is the revision which must be used
      * for state recovery by other components.
      */
     private final CompletableFuture<Long> recoveryFinishedFuture = new 
CompletableFuture<>();
 
+    private final CompletableFuture<Long> recoveryFinishedPublicFuture
+            = recoveryFinishedFuture.whenComplete((revision, e) -> 
appliedRevision.set(revision));

Review Comment:
   maybe you intended to use `thenAccept` here?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java:
##########
@@ -284,16 +288,10 @@ private CompletableFuture<List<WatchAndEvents>> 
collectWatchesAndEvents(List<Ent
         }, watchExecutor);
     }
 
-    private CompletableFuture<Void> 
invokeOnRevisionCallback(List<WatchAndEvents> watchAndEventsList, long 
revision, HybridTimestamp time) {
+    private CompletableFuture<Void> invokeOnRevisionCallback(long revision, 
HybridTimestamp time) {
         try {
-            // Only notify about entries that have been accepted by at least 
one Watch.
-            var acceptedEntries = new HashSet<EntryEvent>();
-
-            for (WatchAndEvents watchAndEvents : watchAndEventsList) {
-                acceptedEntries.addAll(watchAndEvents.events);
-            }
-
-            var event = new WatchEvent(acceptedEntries, revision, time);
+            // We consciously put empty set here. Revision applied callback 
doesn't need any data.

Review Comment:
   I would suggest to change the `revisionCallback` interface explicitly only 
require the revision and the timestamp



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java:
##########
@@ -98,7 +99,7 @@ void testGroupEventNotification() {
 
         WatchEvent event = watchEventCaptor.getValue();
 
-        assertThat(event.entryEvents(), containsInAnyOrder(entryEvent1, 
entryEvent2));
+        assertThat(event.entryEvents(), is(equalTo(emptyList())));

Review Comment:
   I believe there should be Matcher that looks something like `is(empty())`



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java:
##########
@@ -105,7 +106,8 @@ public interface EntryReader {
     public WatchProcessor(String nodeName, EntryReader entryReader) {
         this.entryReader = entryReader;
 
-        this.watchExecutor = Executors.newFixedThreadPool(4, 
NamedThreadFactory.create(nodeName, "metastorage-watch-executor", LOG));
+        this.watchExecutor = Executors.newSingleThreadScheduledExecutor(

Review Comment:
   What is this change for?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to