ibessonov commented on code in PR #1869:
URL: https://github.com/apache/ignite-3/pull/1869#discussion_r1161590847


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java:
##########
@@ -217,6 +219,11 @@ public void onWrite(Iterator<CommandClosure<WriteCommand>> 
iter) {
         }
     }
 
+    @Override
+    public void onBeforeApply(Command command) {
+        writeHandler.beforeApply(command);

Review Comment:
   Same question here, why do we modify command in metastorage listener?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java:
##########
@@ -258,6 +275,21 @@ public Publisher<Entry> prefix(ByteArray prefix, long 
revUpperBound) {
         return new CursorPublisher(context, createPrefixCommand);
     }
 
+    /**
+     * Sends idle safe time sync message. Should be called only on the leader 
node.
+     *
+     * @param safeTime New safe time.
+     * @return Future that will be completed when message is sent.
+     */
+    public CompletableFuture<Void> syncTime(HybridTimestamp safeTime) {
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-19199 Only 
propagate safe time when ms is idle

Review Comment:
   So, the "idle" sync is not implemented yet, is that correct?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.time;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.impl.MetaStorageServiceImpl;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Cluster time implementation with additional methods to adjust time and 
update safe time.
+ */
+public class ClusterTimeImpl implements ClusterTime {
+    private final IgniteSpinBusyLock busyLock;
+
+    private volatile @Nullable LeaderTimer leaderTimer;
+
+    private final HybridClock clock;
+
+    private final PendingComparableValuesTracker<HybridTimestamp> safeTime;
+
+    /**
+     * Constructor.
+     *
+     * @param busyLock Busy lock.
+     * @param clock Node's hybrid clock.
+     */
+    public ClusterTimeImpl(IgniteSpinBusyLock busyLock, HybridClock clock) {
+        this.busyLock = busyLock;
+        this.clock = clock;
+        this.safeTime = new PendingComparableValuesTracker<>(clock.now());

Review Comment:
   I believe that we have a very nice bug here. Why is it safe to initialize 
safeTime as `now()` during the node start?
   The answer is - it's not safe. Metastorage safe time should be far in the 
past during the start, to preserve the monotony of time in new entries (during 
rebalance, in particular)



##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java:
##########
@@ -151,15 +155,22 @@ private DistributionZoneManager 
prepareDistributionZoneManager() {
 
         keyValueStorage = spy(new SimpleInMemoryKeyValueStorage("test"));
 
-        MetaStorageListener metaStorageListener = new 
MetaStorageListener(keyValueStorage);
+        MetaStorageListener metaStorageListener = new 
MetaStorageListener(keyValueStorage, mock(ClusterTimeImpl.class));
 
         RaftGroupService metaStorageService = mock(RaftGroupService.class);
 
+        HybridTimestampMessage mockTsMessage = 
mock(HybridTimestampMessage.class);
+        when(mockTsMessage.asHybridTimestamp()).thenReturn(new 
HybridTimestamp(10, 10));
+
         // Delegate directly to listener.
         lenient().doAnswer(
                 invocationClose -> {

Review Comment:
   As far as I'm aware, @sashapolo is removing the copy-paste. So one of you 
will have to fix conflicts



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageLearnerListener.java:
##########
@@ -56,6 +58,11 @@ public void onWrite(Iterator<CommandClosure<WriteCommand>> 
iter) {
         }
     }
 
+    @Override
+    public void onBeforeApply(Command command) {
+        writeHandler.beforeApply(command);

Review Comment:
   Why do we modify the command on learners? Command should already be correct, 
otherwise I simply don't get it.



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetaStorageWriteCommand.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.command;
+
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.network.annotations.WithSetter;
+
+/** Base meta storage write command. */
+public interface MetaStorageWriteCommand extends WriteCommand {
+    /**
+     * Returns time on the initiator node.
+     */
+    HybridTimestampMessage initiatorTime();
+
+    /**
+     * This is a dirty hack. This time is set by the leader node to 
disseminate new safe time across
+     * followers and learners. Leader of the ms group reads {@link 
#initiatorTime()}, adjusts its clock
+     * and sets safeTime as {@link HybridClock#now()} as safeTime here. This 
must be done before
+     * command is saved into the Raft log (see {@link 
org.apache.ignite.internal.raft.service.RaftGroupListener#onBeforeApply(Command)}.
+     */
+    @WithSetter
+    HybridTimestampMessage safeTime();
+
+    /**
+     * Setter for the safeTime field.
+     */
+    default void safeTime(HybridTimestampMessage safeTime) {

Review Comment:
   Opps, I didn't yet read that one



##########
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java:
##########
@@ -375,4 +382,40 @@ void 
testLearnerLeaveAndJoinBecauseOfNetworkPartition(TestInfo testInfo) throws
 
         assertTrue(waitForCondition(() -> 
firstNode.getMetaStorageLearners().join().equals(Set.of(secondNode.name())), 
10_000));
     }
+
+    /**
+     * Tests that safe time is propagated from the leader to the 
follower/learner.
+     */
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testSafeTimePropagation(boolean useFollower, TestInfo testInfo) 
throws Exception {

Review Comment:
   @tkalkirill could you please clarify what you meant?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.time;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.impl.MetaStorageServiceImpl;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Cluster time implementation with additional methods to adjust time and 
update safe time.
+ */
+public class ClusterTimeImpl implements ClusterTime {
+    private final IgniteSpinBusyLock busyLock;
+
+    private volatile @Nullable LeaderTimer leaderTimer;
+
+    private final HybridClock clock;
+
+    private final PendingComparableValuesTracker<HybridTimestamp> safeTime;
+
+    /**
+     * Constructor.
+     *
+     * @param busyLock Busy lock.
+     * @param clock Node's hybrid clock.
+     */
+    public ClusterTimeImpl(IgniteSpinBusyLock busyLock, HybridClock clock) {
+        this.busyLock = busyLock;
+        this.clock = clock;
+        this.safeTime = new PendingComparableValuesTracker<>(clock.now());
+    }
+
+    /**
+     * Starts sync time scheduler.
+     *
+     * @param service MetaStorage service that is used by scheduler to sync 
time.
+     */
+    public void startLeaderTimer(MetaStorageServiceImpl service) {
+        if (!busyLock.enterBusy()) {
+            return;
+        }
+
+        try {
+            assert leaderTimer == null;
+
+            LeaderTimer newTimer = new LeaderTimer(service);
+
+            leaderTimer = newTimer;
+
+            newTimer.start();
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Stops sync time scheduler if it exists.
+     */
+    public void stopLeaderTimer() {
+        LeaderTimer timer = leaderTimer;
+
+        if (timer != null) {
+            timer.stop();
+
+            leaderTimer = null;
+        }
+    }
+
+    @Override
+    public HybridTimestamp now() {
+        return clock.now();
+    }
+
+    @Override
+    public CompletableFuture<Void> waitFor(HybridTimestamp time) {
+        return safeTime.waitFor(time);
+    }
+
+    /**
+     * Updates the internal safe time.
+     *
+     * @param newValue New safe time value.
+     */
+    public void updateSafeTime(HybridTimestamp newValue) {

Review Comment:
   Can this one and the next methods be combined into one?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.time;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.impl.MetaStorageServiceImpl;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Cluster time implementation with additional methods to adjust time and 
update safe time.
+ */
+public class ClusterTimeImpl implements ClusterTime {
+    private final IgniteSpinBusyLock busyLock;
+
+    private volatile @Nullable LeaderTimer leaderTimer;
+
+    private final HybridClock clock;
+
+    private final PendingComparableValuesTracker<HybridTimestamp> safeTime;
+
+    /**
+     * Constructor.
+     *
+     * @param busyLock Busy lock.
+     * @param clock Node's hybrid clock.
+     */
+    public ClusterTimeImpl(IgniteSpinBusyLock busyLock, HybridClock clock) {
+        this.busyLock = busyLock;
+        this.clock = clock;
+        this.safeTime = new PendingComparableValuesTracker<>(clock.now());
+    }
+
+    /**
+     * Starts sync time scheduler.
+     *
+     * @param service MetaStorage service that is used by scheduler to sync 
time.
+     */
+    public void startLeaderTimer(MetaStorageServiceImpl service) {
+        if (!busyLock.enterBusy()) {
+            return;
+        }
+
+        try {
+            assert leaderTimer == null;
+
+            LeaderTimer newTimer = new LeaderTimer(service);
+
+            leaderTimer = newTimer;
+
+            newTimer.start();
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Stops sync time scheduler if it exists.
+     */
+    public void stopLeaderTimer() {
+        LeaderTimer timer = leaderTimer;

Review Comment:
   No busy lock here, why?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java:
##########
@@ -234,4 +255,18 @@ private static RevisionCondition.Type 
toRevisionConditionType(ConditionType type
                 throw new IllegalArgumentException("Unexpected revision 
condition type " + type);
         }
     }
+
+    void beforeApply(Command command) {
+        if (command instanceof MetaStorageWriteCommand) {
+            // Initiator sends us a timestamp to adjust to.
+            // Alter command by setting safe time based on the adjusted clock.
+            clusterTime.adjust(((MetaStorageWriteCommand) 
command).initiatorTime().asHybridTimestamp());

Review Comment:
   I think you may extract `(MetaStorageWriteCommand) command` into a variable, 
but it's up to you



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java:
##########
@@ -234,4 +253,18 @@ private static RevisionCondition.Type 
toRevisionConditionType(ConditionType type
                 throw new IllegalArgumentException("Unexpected revision 
condition type " + type);
         }
     }
+
+    void beforeApply(Command command) {

Review Comment:
   Why do we call it in learner listener?



##########
modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java:
##########
@@ -142,11 +142,12 @@ private void startPlacementDriverManager() throws 
NodeStoppingException {
                 eventsClientListener
         );
 
+        HybridClockImpl nodeClock = new HybridClockImpl();

Review Comment:
   You did it again :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to