sashapolo commented on code in PR #3042:
URL: https://github.com/apache/ignite-3/pull/3042#discussion_r1450455297
##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -121,12 +105,17 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
/** Prevents double stopping of the component. */
private final AtomicBoolean isStopped = new AtomicBoolean();
+ private final AtomicLong appliedRevision = new AtomicLong(0L);
Review Comment:
what was wrong with `volatile long`?
##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -121,12 +105,17 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
/** Prevents double stopping of the component. */
private final AtomicBoolean isStopped = new AtomicBoolean();
+ private final AtomicLong appliedRevision = new AtomicLong(0L);
+
/**
* Future which completes when MetaStorage manager finished local
recovery. The value of the future is the revision which must be used
* for state recovery by other components.
*/
private final CompletableFuture<Long> recoveryFinishedFuture = new
CompletableFuture<>();
+ private final CompletableFuture<Long> recoveryFinishedPublicFuture
+ = recoveryFinishedFuture.whenComplete((revision, e) ->
appliedRevision.set(revision));
Review Comment:
maybe you intended to use `thenAccept` here?
##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java:
##########
@@ -284,16 +288,10 @@ private CompletableFuture<List<WatchAndEvents>>
collectWatchesAndEvents(List<Ent
}, watchExecutor);
}
- private CompletableFuture<Void>
invokeOnRevisionCallback(List<WatchAndEvents> watchAndEventsList, long
revision, HybridTimestamp time) {
+ private CompletableFuture<Void> invokeOnRevisionCallback(long revision,
HybridTimestamp time) {
try {
- // Only notify about entries that have been accepted by at least
one Watch.
- var acceptedEntries = new HashSet<EntryEvent>();
-
- for (WatchAndEvents watchAndEvents : watchAndEventsList) {
- acceptedEntries.addAll(watchAndEvents.events);
- }
-
- var event = new WatchEvent(acceptedEntries, revision, time);
+ // We consciously put empty set here. Revision applied callback
doesn't need any data.
Review Comment:
I would suggest to change the `revisionCallback` interface explicitly only
require the revision and the timestamp
##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java:
##########
@@ -98,7 +99,7 @@ void testGroupEventNotification() {
WatchEvent event = watchEventCaptor.getValue();
- assertThat(event.entryEvents(), containsInAnyOrder(entryEvent1,
entryEvent2));
+ assertThat(event.entryEvents(), is(equalTo(emptyList())));
Review Comment:
I believe there should be Matcher that looks something like `is(empty())`
##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java:
##########
@@ -105,7 +106,8 @@ public interface EntryReader {
public WatchProcessor(String nodeName, EntryReader entryReader) {
this.entryReader = entryReader;
- this.watchExecutor = Executors.newFixedThreadPool(4,
NamedThreadFactory.create(nodeName, "metastorage-watch-executor", LOG));
+ this.watchExecutor = Executors.newSingleThreadScheduledExecutor(
Review Comment:
What is this change for?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]