tkalkirill commented on code in PR #1869:
URL: https://github.com/apache/ignite-3/pull/1869#discussion_r1159259377
##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java:
##########
@@ -84,4 +84,8 @@ public interface MetastorageCommandsMessageGroup {
/** Message type for {@link CloseAllCursorsCommand}. */
short CLOSE_ALL_CURSORS = 64;
+
+ short HYBRID_TS = 65;
+
+ short SYNC_TIME = 66;
Review Comment:
Missing javadoc
##########
modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java:
##########
@@ -68,10 +67,9 @@ public HybridTimestamp(long physical, int logical) {
* @param times Times for comparing.
* @return The highest hybrid timestamp.
*/
- public static @Nullable HybridTimestamp max(HybridTimestamp... times) {
- if (times.length == 0) {
- return null;
- }
+ public static HybridTimestamp max(HybridTimestamp... times) {
+ assert times != null;
+ assert times.length > 0;
Review Comment:
Please indicate it (`times.length > 0`) in the documentation and also
correct the description of the method itself.
Or maybe `throw new IllegalArgumentException` ?
##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java:
##########
@@ -84,4 +84,8 @@ public interface MetastorageCommandsMessageGroup {
/** Message type for {@link CloseAllCursorsCommand}. */
short CLOSE_ALL_CURSORS = 64;
+
+ short HYBRID_TS = 65;
Review Comment:
Missing javadoc
##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRaftGroupEventsListener.java:
##########
@@ -156,11 +178,16 @@ private void executeIfLeader(OnLeaderAction action) {
}
private CompletableFuture<Void> executeIfLeaderImpl(OnLeaderAction action)
{
+ return executeWithStatus((service, term, isLeader) ->
action.apply(service, term));
Review Comment:
It seems that here it is necessary to return `completedFuture(null)` if not
the leader.
##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -109,6 +112,8 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
/** Prevents double stopping of the component. */
private final AtomicBoolean isStopped = new AtomicBoolean();
+ private final ClusterTimeImpl clusterTime;
Review Comment:
I think we should also stop at `MetaStorageManagerImpl#stop`
##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRaftGroupEventsListener.java:
##########
@@ -128,6 +146,10 @@ private interface OnLeaderAction {
CompletableFuture<Void> apply(MetaStorageServiceImpl service, long
term);
}
+ private interface OnStatusAction {
Review Comment:
Missing `FunctionalInterface`.
Can get rid of the `OnLeaderAction`?
##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTime.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.time;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+
+/**
+ * Cluster time with a hybrid clock instance and access to safe time.
+ */
+public interface ClusterTime {
+ /**
+ * Returns current cluster time.
+ *
+ * @return Current cluster time.
Review Comment:
```suggestion
```
##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java:
##########
@@ -258,6 +275,20 @@ public Publisher<Entry> prefix(ByteArray prefix, long
revUpperBound) {
return new CursorPublisher(context, createPrefixCommand);
}
+ /**
+ * Sends safe time sync message. Should be called only on the leader node.
+ *
+ * @param safeTime New safe time.
+ * @return Future that will be completed when message is sent.
+ */
+ public CompletableFuture<Void> syncTime(HybridTimestamp safeTime) {
Review Comment:
I noticed that you do not use the return value, would it not be a mistake to
always send a new safeTime even if we did not process the previous one.
##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageLearnerListener.java:
##########
@@ -56,6 +58,11 @@ public void onWrite(Iterator<CommandClosure<WriteCommand>>
iter) {
}
}
+ @Override
+ public void onBeforeApply(Command command) {
+ writeHandler.beforeApply(command);
Review Comment:
Why before and not on command execution?
##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.time;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.impl.MetaStorageServiceImpl;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Cluster time implementation with additional methods to adjust time and
update safe time.
+ */
+public class ClusterTimeImpl implements ClusterTime {
+ /** The logger. */
+ private static final IgniteLogger LOG =
Loggers.forClass(ClusterTimeImpl.class);
+
+ private final IgniteSpinBusyLock busyLock;
+
+ private volatile @Nullable LeaderTimer leaderTimer;
+
+ private final HybridClock clock;
+
+ private final PendingComparableValuesTracker<HybridTimestamp> safeTime;
+
+ /**
+ * Constructor.
+ *
+ * @param busyLock Busy lock.
+ * @param clock Node's hybrid clock.
+ */
+ public ClusterTimeImpl(IgniteSpinBusyLock busyLock, HybridClock clock) {
+ this.busyLock = busyLock;
+ this.clock = clock;
+ this.safeTime = new PendingComparableValuesTracker<>(clock.now());
+ }
+
+ /**
+ * Starts sync time scheduler.
+ *
+ * @param service MetaStorage service that is used by scheduler to sync
time.
+ */
+ public void startLeaderTimer(MetaStorageServiceImpl service) {
+ if (!busyLock.enterBusy()) {
+ return;
+ }
+
+ try {
+ assert leaderTimer == null;
+
+ LeaderTimer newTimer = new LeaderTimer(service);
+
+ leaderTimer = newTimer;
+
+ newTimer.start();
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Stops sync time scheduler.
+ */
+ public void stopLeaderTimer() {
+ LeaderTimer timer = leaderTimer;
+
+ assert timer != null;
+
+ timer.stop();
+
+ leaderTimer = null;
+ }
+
+ @Override
+ public HybridTimestamp now() {
+ return clock.now();
+ }
+
+ @Override
+ public CompletableFuture<Void> waitFor(HybridTimestamp time) {
+ return safeTime.waitFor(time);
+ }
+
+ public void updateSafeTime(HybridTimestamp ts) {
+ this.safeTime.update(ts);
+ }
+
+ public void adjust(HybridTimestamp ts) {
Review Comment:
Little confuse name
##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.time;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.impl.MetaStorageServiceImpl;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Cluster time implementation with additional methods to adjust time and
update safe time.
+ */
+public class ClusterTimeImpl implements ClusterTime {
+ /** The logger. */
+ private static final IgniteLogger LOG =
Loggers.forClass(ClusterTimeImpl.class);
+
+ private final IgniteSpinBusyLock busyLock;
+
+ private volatile @Nullable LeaderTimer leaderTimer;
+
+ private final HybridClock clock;
+
+ private final PendingComparableValuesTracker<HybridTimestamp> safeTime;
+
+ /**
+ * Constructor.
+ *
+ * @param busyLock Busy lock.
+ * @param clock Node's hybrid clock.
+ */
+ public ClusterTimeImpl(IgniteSpinBusyLock busyLock, HybridClock clock) {
+ this.busyLock = busyLock;
+ this.clock = clock;
+ this.safeTime = new PendingComparableValuesTracker<>(clock.now());
+ }
+
+ /**
+ * Starts sync time scheduler.
+ *
+ * @param service MetaStorage service that is used by scheduler to sync
time.
+ */
+ public void startLeaderTimer(MetaStorageServiceImpl service) {
+ if (!busyLock.enterBusy()) {
+ return;
+ }
+
+ try {
+ assert leaderTimer == null;
+
+ LeaderTimer newTimer = new LeaderTimer(service);
+
+ leaderTimer = newTimer;
+
+ newTimer.start();
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Stops sync time scheduler.
+ */
+ public void stopLeaderTimer() {
+ LeaderTimer timer = leaderTimer;
+
+ assert timer != null;
+
+ timer.stop();
+
+ leaderTimer = null;
+ }
+
+ @Override
+ public HybridTimestamp now() {
+ return clock.now();
+ }
+
+ @Override
+ public CompletableFuture<Void> waitFor(HybridTimestamp time) {
+ return safeTime.waitFor(time);
+ }
+
+ public void updateSafeTime(HybridTimestamp ts) {
+ this.safeTime.update(ts);
+ }
+
+ public void adjust(HybridTimestamp ts) {
+ this.clock.update(ts);
+ }
+
+ private class LeaderTimer {
+
+ private final MetaStorageServiceImpl service;
+
+ /** Scheduled executor for cluster time sync. */
+ private final ScheduledExecutorService
scheduledClusterTimeSyncExecutor =
+ Executors.newScheduledThreadPool(1, new
NamedThreadFactory("scheduled-cluster-time-sync-thread", LOG));
Review Comment:
Please add nodeName
##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.time;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.impl.MetaStorageServiceImpl;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Cluster time implementation with additional methods to adjust time and
update safe time.
+ */
+public class ClusterTimeImpl implements ClusterTime {
+ /** The logger. */
+ private static final IgniteLogger LOG =
Loggers.forClass(ClusterTimeImpl.class);
+
+ private final IgniteSpinBusyLock busyLock;
+
+ private volatile @Nullable LeaderTimer leaderTimer;
+
+ private final HybridClock clock;
+
+ private final PendingComparableValuesTracker<HybridTimestamp> safeTime;
+
+ /**
+ * Constructor.
+ *
+ * @param busyLock Busy lock.
+ * @param clock Node's hybrid clock.
+ */
+ public ClusterTimeImpl(IgniteSpinBusyLock busyLock, HybridClock clock) {
+ this.busyLock = busyLock;
+ this.clock = clock;
+ this.safeTime = new PendingComparableValuesTracker<>(clock.now());
+ }
+
+ /**
+ * Starts sync time scheduler.
+ *
+ * @param service MetaStorage service that is used by scheduler to sync
time.
+ */
+ public void startLeaderTimer(MetaStorageServiceImpl service) {
+ if (!busyLock.enterBusy()) {
+ return;
+ }
+
+ try {
+ assert leaderTimer == null;
+
+ LeaderTimer newTimer = new LeaderTimer(service);
+
+ leaderTimer = newTimer;
+
+ newTimer.start();
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Stops sync time scheduler.
+ */
+ public void stopLeaderTimer() {
+ LeaderTimer timer = leaderTimer;
+
+ assert timer != null;
+
+ timer.stop();
+
+ leaderTimer = null;
+ }
+
+ @Override
+ public HybridTimestamp now() {
+ return clock.now();
+ }
+
+ @Override
+ public CompletableFuture<Void> waitFor(HybridTimestamp time) {
+ return safeTime.waitFor(time);
+ }
+
+ public void updateSafeTime(HybridTimestamp ts) {
+ this.safeTime.update(ts);
+ }
+
+ public void adjust(HybridTimestamp ts) {
+ this.clock.update(ts);
+ }
+
+ private class LeaderTimer {
Review Comment:
Why not separate class ?
##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java:
##########
@@ -66,6 +76,10 @@ class MetaStorageWriteHandler {
boolean handleWriteCommand(CommandClosure<WriteCommand> clo) {
WriteCommand command = clo.command();
+ if (command instanceof MetaStorageWriteCommand) {
+ clusterTime.updateSafeTime(((MetaStorageWriteCommand)
command).safeTime().asHybridTimestamp());
Review Comment:
I don't understand why this code.
##########
modules/network-api/src/main/java/org/apache/ignite/network/annotations/WithSetter.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.annotations;
+
+/**
+ * Annotation for the {@link Transferable} class methods. If a method is
marked with this annotation,
+ * a setter for the field with the same name as method's will be generated.
+ * In order to have access to this setter via interface one can use default
method:
+ * <pre>
+ * {@code @WithSetter
+ * HybridTimestampMessage safeTime();
+ *
+ * default void safeTime(HybridTimestampMessage safeTime) {
+ * // No-op.
+ * }
+ * }
+ * </pre>
+ * Note that fields with setters will not be final.
+ */
+public @interface WithSetter {
Review Comment:
Perhaps it would be more correct to add it in a separate ticket and test it
more thoroughly.
##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java:
##########
@@ -234,4 +252,16 @@ private static RevisionCondition.Type
toRevisionConditionType(ConditionType type
throw new IllegalArgumentException("Unexpected revision
condition type " + type);
}
}
+
+ void beforeApply(Command command) {
Review Comment:
At the moment, the code is very strange for me and not clear.
##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRaftGroupEventsListener.java:
##########
@@ -84,7 +89,20 @@ public void onLeaderElected(long term) {
registerTopologyEventListeners();
// Update learner configuration (in case we missed some topology
updates) and initialize the serialization future.
- serializationFuture = executeIfLeaderImpl(this::resetLearners);
+ serializationFuture = executeWithStatus((service, term1, isLeader)
-> {
+ CompletableFuture<Void> fut;
+ if (isLeader) {
+ fut = this.resetLearners(service, term1);
+
+ clusterTime.startLeaderTimer(service);
+ } else {
+ fut = CompletableFuture.completedFuture(null);
Review Comment:
```suggestion
fut = completedFuture(null);
```
##########
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java:
##########
@@ -375,4 +382,40 @@ void
testLearnerLeaveAndJoinBecauseOfNetworkPartition(TestInfo testInfo) throws
assertTrue(waitForCondition(() ->
firstNode.getMetaStorageLearners().join().equals(Set.of(secondNode.name())),
10_000));
}
+
+ /**
+ * Tests that safe time is propagated from the leader to the
follower/learner.
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testSafeTimePropagation(boolean useFollower, TestInfo testInfo)
throws Exception {
Review Comment:
There is not enough test to change the leader.
##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetaStorageWriteCommand.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.command;
+
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.network.annotations.WithSetter;
+
+/** Base meta storage write command. */
+public interface MetaStorageWriteCommand extends WriteCommand {
+ /**
+ * Returns time on the initiator node.
+ */
+ HybridTimestampMessage initiatorTime();
+
+ /**
+ * This is a dirty hack. This time is set by the leader node to
disseminate new safe time across
+ * followers and learners.
+ */
+ @WithSetter
+ HybridTimestampMessage safeTime();
Review Comment:
While I don’t understand this dirty hack, we have a separate command for
time synchronization.
We need a very detailed description of this hack.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]