ascherbakoff commented on code in PR #937:
URL: https://github.com/apache/ignite-3/pull/937#discussion_r927652761
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java:
##########
@@ -543,9 +554,13 @@ public void onSnapshotSave(SnapshotWriter writer, Closure
done) {
if (res == null) {
File file = new File(writer.getPath());
- for (File file0 : file.listFiles()) {
- if (file0.isFile()) {
- writer.addFile(file0.getName(), null);
+ File[] snapshotFiles = file.listFiles();
+
+ if (snapshotFiles != null) {
Review Comment:
How can this be NULL ?
##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -30,6 +31,21 @@
* <p>Each MvPartitionStorage instance represents exactly one partition.
*/
public interface MvPartitionStorage extends AutoCloseable {
Review Comment:
Should PartitionStorage be deprecated ?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot;
+
+import java.util.List;
+import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.SnapshotStorage;
+import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
+import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
+
+/**
+ * Snapshot storage factory for {@link MvPartitionStorage}. Utilizes the fact
that every partition already stores its latest applied index
+ * and thus can inself be used as its own snapshot.
+ *
+ * <p/>Uses {@link MvPartitionStorage#persistedIndex()} and configuration,
passed into constructor, to create a {@link SnapshotMeta} object
+ * in {@link SnapshotReader#load()}.
+ *
+ * <p/>Snapshot writer doesn't allow explicit save of any actual file. {@link
SnapshotWriter#saveMeta(SnapshotMeta)} simply returns
+ * {@code true}, and {@link SnapshotWriter#addFile(String)} throws an
exception.
+ */
+public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory
{
+ /** Partition storage. */
+ private final MvPartitionStorage partitionStorage;
+
+ /** List of peers. */
+ private final List<String> peers;
+
+ /** List of learners. */
+ private final List<String> learners;
+
+ /** RAFT log index read from {@link MvPartitionStorage#persistedIndex()}
during factory instantiation. */
+ private final long persistedRaftIndex;
+
+ /**
+ * Constructor.
+ *
+ * @param partitionStorage MV partition storage.
+ * @param peers List of raft group peers to be used in snapshot meta.
+ * @param learners List of raft group learners to be used in snapshot meta.
+ *
+ * @see SnapshotMeta
+ */
+ @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+ public PartitionSnapshotStorageFactory(MvPartitionStorage
partitionStorage, List<String> peers, List<String> learners) {
+ this.partitionStorage = partitionStorage;
+ this.peers = peers;
+ this.learners = learners;
+
+ persistedRaftIndex = partitionStorage.persistedIndex();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public SnapshotStorage createSnapshotStorage(String uri, RaftOptions
raftOptions) {
+ SnapshotMeta snapshotMeta = new RaftMessagesFactory().snapshotMeta()
+ .lastIncludedIndex(persistedRaftIndex)
+ // According to the code of
org.apache.ignite.raft.jraft.core.NodeImpl.bootstrap, it's "dangerous" to init
term with a value
+ // greater than 1.
+ .lastIncludedTerm(persistedRaftIndex > 0 ? 1 : 0)
Review Comment:
Can persistedRaftIndex be == 0 ? I suppose no then restoring from snapshot.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -404,11 +416,13 @@ private boolean handleFinishTxCommand(FinishTxCommand
cmd) {
boolean stateChanged = txManager.changeState(txId, TxState.PENDING,
cmd.finish() ? TxState.COMMITED : TxState.ABORTED);
- if (txManager.state(txId) == TxState.COMMITED) {
+ // This code is technically incorrect and assumes that "stateChanged"
is always true. This was done because transaction state is not
+ // persisted and thus FinishTxCommand couldn't be completed on
recovery after node restart ("changeState" uses "replace").
+ if (/*txManager.state(txId) == TxState.COMMITED*/cmd.finish()) {
cmd.lockedKeys().getOrDefault(lockId, new
ArrayList<>()).forEach(key -> {
Review Comment:
Why new object is created here ?
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java:
##########
@@ -253,106 +260,57 @@ public void startRaftGroupNode(
try {
List<Peer> peers = nodes.stream().map(n -> new
Peer(n.address())).collect(Collectors.toList());
- String locNodeName =
clusterNetSvc.topologyService().localMember().name();
+ LOG.info("Start new raft node for group={} with initial peers={}",
grpId, peers);
- if (deltaNodes.stream().anyMatch(n ->
locNodeName.equals(n.name()))) {
- LOG.info("Start new raft node for group={} with initial
peers={}", grpId, peers);
+ if (!raftServer.startRaftGroup(grpId, raftGrpEvtsLsnr, lsnr,
peers, groupOptions)) {
+ String locNodeName =
clusterNetSvc.topologyService().localMember().name();
- if (!raftServer.startRaftGroup(grpId,
raftGrpEvtsLsnrSupplier.get(), lsnrSupplier.get(), peers, groupOptions)) {
- throw new
IgniteInternalException(IgniteStringFormatter.format(
- "Raft group on the node is already started
[node={}, raftGrp={}]",
- locNodeName,
- grpId
- ));
- }
+ throw new IgniteInternalException(IgniteStringFormatter.format(
+ "Raft group on the node is already started [node={},
raftGrp={}]",
+ locNodeName,
Review Comment:
The locNodeName var seems not needed
##########
modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/CommandClosure.java:
##########
@@ -28,6 +28,14 @@
* @see RaftGroupListener
*/
public interface CommandClosure<R extends Command> {
+ /**
+ * Corresponding log index of the command. Present for write commands only.
+ * Returns {@code 0} for read commands.
+ */
+ default long lastAppliedIndex() {
Review Comment:
This is not an applied index, it's a committed index. Read commands has
committed index as well, see
org.apache.ignite.raft.jraft.closure.ReadIndexClosure#run(org.apache.ignite.raft.jraft.Status,
long, byte[])
I suggest to rename it to just index().
##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -118,4 +134,24 @@ public interface MvPartitionStorage extends AutoCloseable {
* @throws StorageException If failed to read data from the storage.
*/
Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, Timestamp
timestamp) throws StorageException;
+
+ /**
+ * Returns rows count belongs to current storage.
+ *
+ * @return Rows count.
+ * @throws StorageException If failed to obtain size.
+ * @deprecated It's not yet defined what a "count" is. This value is not
easily defined for multiversioned storages.
+ */
+ @Deprecated
Review Comment:
Do we need a linked ticket ?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot;
+
+import java.util.List;
+import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.SnapshotStorage;
+import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
+import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
+
+/**
+ * Snapshot storage factory for {@link MvPartitionStorage}. Utilizes the fact
that every partition already stores its latest applied index
+ * and thus can inself be used as its own snapshot.
+ *
+ * <p/>Uses {@link MvPartitionStorage#persistedIndex()} and configuration,
passed into constructor, to create a {@link SnapshotMeta} object
+ * in {@link SnapshotReader#load()}.
+ *
+ * <p/>Snapshot writer doesn't allow explicit save of any actual file. {@link
SnapshotWriter#saveMeta(SnapshotMeta)} simply returns
+ * {@code true}, and {@link SnapshotWriter#addFile(String)} throws an
exception.
+ */
+public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory
{
+ /** Partition storage. */
+ private final MvPartitionStorage partitionStorage;
+
+ /** List of peers. */
+ private final List<String> peers;
+
+ /** List of learners. */
+ private final List<String> learners;
+
+ /** RAFT log index read from {@link MvPartitionStorage#persistedIndex()}
during factory instantiation. */
+ private final long persistedRaftIndex;
+
+ /**
+ * Constructor.
+ *
+ * @param partitionStorage MV partition storage.
+ * @param peers List of raft group peers to be used in snapshot meta.
+ * @param learners List of raft group learners to be used in snapshot meta.
+ *
+ * @see SnapshotMeta
+ */
+ @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+ public PartitionSnapshotStorageFactory(MvPartitionStorage
partitionStorage, List<String> peers, List<String> learners) {
+ this.partitionStorage = partitionStorage;
+ this.peers = peers;
+ this.learners = learners;
+
+ persistedRaftIndex = partitionStorage.persistedIndex();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public SnapshotStorage createSnapshotStorage(String uri, RaftOptions
raftOptions) {
+ SnapshotMeta snapshotMeta = new RaftMessagesFactory().snapshotMeta()
+ .lastIncludedIndex(persistedRaftIndex)
+ // According to the code of
org.apache.ignite.raft.jraft.core.NodeImpl.bootstrap, it's "dangerous" to init
term with a value
Review Comment:
How it's dangerous ? ;)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]