tkalkirill commented on code in PR #937:
URL: https://github.com/apache/ignite-3/pull/937#discussion_r926419883
##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java:
##########
@@ -26,6 +26,7 @@
/**
* Table storage that contains meta, partitions and SQL indexes.
*/
+@Deprecated
Review Comment:
Probably it is necessary to specify what to use.
##########
modules/raft/src/test/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java:
##########
@@ -232,17 +235,28 @@ private <T extends Command> void handleActionRequest(
BlockingQueue<CommandClosureEx<T>> queue,
RaftGroupListener lsnr
) {
+ long index = req.command() instanceof ReadCommand ? 0 :
appliedIndex.incrementAndGet();
Review Comment:
```suggestion
long appliedIndex = req.command() instanceof ReadCommand ? 0 :
appliedIndex.incrementAndGet();
```
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java:
##########
@@ -499,14 +499,25 @@ public boolean hasNext() {
public CommandClosure<WriteCommand> next() {
@Nullable CommandClosure<WriteCommand> done =
(CommandClosure<WriteCommand>) iter.done();
ByteBuffer data = iter.getData();
- WriteCommand command =
JDKMarshaller.DEFAULT.unmarshall(data.array());
+
+ WriteCommand command = done == null ?
JDKMarshaller.DEFAULT.unmarshall(data.array()) : done.command();
+
+ long index = iter.getIndex();
Review Comment:
```suggestion
long appliedIndex = iter.getIndex();
```
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java:
##########
@@ -71,6 +72,14 @@ public class PageMemoryMvPartitionStorage implements
MvPartitionStorage {
ScanVersionChainByTimestamp::new
);
+ /**
+ * Applied index value.
+ *
+ * @deprecated Not persistent, should be fixed later. TODO IGNITE-17077
+ */
+ @Deprecated
+ private long appliedIndex = 0;
Review Comment:
Why not **volatile**?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -485,24 +486,38 @@ private void
updateAssignmentInternal(ConfigurationNotificationEvent<byte[]> ass
assert internalTbl.storage() instanceof MvTableStorage :
"Only multi version storages are supported.
Current storage is a " + internalTbl.storage().getClass().getName();
- futures[partId] = raftMgr.updateRaftGroup(
- partitionRaftGroupName(tblId, partId),
- newPartAssignment,
- // start new nodes, only if it is table creation
- // other cases will be covered by rebalance logic
- (oldPartAssignment.isEmpty()) ? newPartAssignment
: Collections.emptyList(),
- () -> new PartitionListener(tblId,
- new VersionedRowStore(((MvTableStorage)
internalTbl.storage()).getOrCreateMvPartition(partId),
- txManager)),
- () -> new RebalanceRaftGroupEventsListener(
- metaStorageMgr,
-
tablesCfg.tables().get(tablesById.get(tblId).name()),
- partitionRaftGroupName(tblId, partId),
- partId,
- busyLock,
- movePartition(() ->
internalTbl.partitionRaftGroupService(partId)),
- rebalanceScheduler),
- groupOptionsForInternalTable(internalTbl)
+ // start new nodes, only if it is table creation
+ // other cases will be covered by rebalance logic
+ List<ClusterNode> nodes = (oldPartAssignment.isEmpty()) ?
newPartAssignment : Collections.emptyList();
+
+ String grpId = partitionRaftGroupName(tblId, partId);
+
+ if (raftMgr.shouldHaveRaftGroupLocally(nodes)) {
+ MvPartitionStorage partitionStorage =
internalTbl.storage().getOrCreateMvPartition(partId);
+
+ RaftGroupOptions groupOptions =
groupOptionsForPartition(internalTbl, partitionStorage, newPartAssignment);
+
+ raftMgr.startRaftGroupNode(
+ grpId,
+ newPartAssignment,
+ new PartitionListener(tblId,
+ new VersionedRowStore(partitionStorage,
+ txManager)),
Review Comment:
```suggestion
new
VersionedRowStore(partitionStorage, txManager)),
```
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java:
##########
@@ -268,16 +268,22 @@ public RocksDbMvPartitionStorage
getOrCreateMvPartition(int partitionId) throws
@Override
public RocksDbMvPartitionStorage getMvPartition(int partitionId) {
if (getPartition(partitionId) == null) {
- throw new NullPointerException("Partition doesn't exist");
+ return null;
}
- return new RocksDbMvPartitionStorage(partitionId, db,
partitionCf.handle());
+ return new RocksDbMvPartitionStorage(partitionId, db,
partitionCf.handle(), meta.columnFamily().handle());
}
@Override
Review Comment:
```suggestion
/** {@inheritDoc} */
@Override
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1325,14 +1341,33 @@ public boolean onUpdate(@NotNull WatchEvent evt) {
+ " [key={}, partition={}, table={},
localMemberAddress={}]",
pendingAssignmentsWatchEvent.key(), part,
tbl.name(), localMember.address());
- raftMgr.startRaftGroupNode(
- partId,
- assignments,
- deltaPeers,
- raftGrpLsnrSupplier,
- raftGrpEvtsLsnrSupplier,
-
groupOptionsForInternalTable(tbl.internalTable())
- );
+ if (raftMgr.shouldHaveRaftGroupLocally(deltaPeers)) {
+ MvPartitionStorage partitionStorage =
tbl.internalTable().storage().getOrCreateMvPartition(part);
+
+ RaftGroupOptions groupOptions =
groupOptionsForPartition(tbl.internalTable(), partitionStorage, assignments);
+
+ RaftGroupListener raftGrpLsnr = new
PartitionListener(
+ tblId,
+ new VersionedRowStore(partitionStorage,
txManager)
+ );
+
+ RaftGroupEventsListener raftGrpEvtsLsnr = new
RebalanceRaftGroupEventsListener(
+ metaStorageMgr,
+ tblCfg,
+ partId,
+ part,
+ busyLock,
+ movePartition(() ->
tbl.internalTable().partitionRaftGroupService(part)),
+ rebalanceScheduler);
Review Comment:
```suggestion
movePartition(() ->
tbl.internalTable().partitionRaftGroupService(part)),
rebalanceScheduler
);
```
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java:
##########
@@ -268,16 +268,22 @@ public RocksDbMvPartitionStorage
getOrCreateMvPartition(int partitionId) throws
@Override
public RocksDbMvPartitionStorage getMvPartition(int partitionId) {
Review Comment:
```suggestion
public @Nullable RocksDbMvPartitionStorage getMvPartition(int
partitionId) {
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java:
##########
@@ -69,6 +72,22 @@ public class VersionedRowStore {
public VersionedRowStore(@NotNull MvPartitionStorage storage, @NotNull
TxManager txManager) {
this.storage = Objects.requireNonNull(storage);
this.txManager = Objects.requireNonNull(txManager);
+
+ Set<RowId> ids = new HashSet<>();
+
+ storage.forEach((rowId, binaryRow) -> {
+ if (ids.add(rowId)) {
+ primaryIndex.put(binaryRow.keySlice(), rowId);
+ }
+ });
+ }
+
+ public void appliedIndex(long appliedIndex) {
+ storage.appliedIndex(appliedIndex);
+ }
+
+ public long appliedIndex() {
Review Comment:
Please add javadoc
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java:
##########
@@ -69,6 +72,22 @@ public class VersionedRowStore {
public VersionedRowStore(@NotNull MvPartitionStorage storage, @NotNull
TxManager txManager) {
this.storage = Objects.requireNonNull(storage);
this.txManager = Objects.requireNonNull(txManager);
+
+ Set<RowId> ids = new HashSet<>();
+
+ storage.forEach((rowId, binaryRow) -> {
+ if (ids.add(rowId)) {
+ primaryIndex.put(binaryRow.keySlice(), rowId);
+ }
+ });
+ }
+
+ public void appliedIndex(long appliedIndex) {
Review Comment:
Please add javadoc
##########
modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java:
##########
@@ -463,13 +464,16 @@ private TableImpl createTable(SchemaDescriptor schema) {
TxManager txManager = new TxManagerImpl(clusterService, lockManager);
+ AtomicLong raftIndex = new AtomicLong();
+
DummyInternalTableImpl table = new DummyInternalTableImpl(
new VersionedRowStore(new TestMvPartitionStorage(List.of(),
0), txManager),
- txManager);
+ txManager,
+ raftIndex);
Review Comment:
```suggestion
raftIndex
);
```
##########
modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsSimpleSchemaTest.java:
##########
@@ -718,13 +719,16 @@ private TableImpl createTable(SchemaDescriptor schema) {
TxManager txManager = new TxManagerImpl(clusterService, lockManager);
+ AtomicLong raftIndex = new AtomicLong();
+
DummyInternalTableImpl table = new DummyInternalTableImpl(
new VersionedRowStore(new TestMvPartitionStorage(List.of(),
0), txManager),
- txManager);
+ txManager,
+ raftIndex);
Review Comment:
```suggestion
raftIndex
);
```
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java:
##########
@@ -57,6 +57,13 @@ class RocksDbMetaStorage {
this.metaCf = metaCf;
}
+ /**
+ * Returns a column family instance, associated with the meta storage.
+ */
+ ColumnFamily columnFamily() {
+ return metaCf;
Review Comment:
```suggestion
return metaColumnFamily;
```
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java:
##########
@@ -71,6 +72,14 @@ public class PageMemoryMvPartitionStorage implements
MvPartitionStorage {
ScanVersionChainByTimestamp::new
);
+ /**
+ * Applied index value.
+ *
+ * @deprecated Not persistent, should be fixed later. TODO IGNITE-17077
+ */
+ @Deprecated
Review Comment:
Why not just **TODO** there?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -485,24 +486,38 @@ private void
updateAssignmentInternal(ConfigurationNotificationEvent<byte[]> ass
assert internalTbl.storage() instanceof MvTableStorage :
"Only multi version storages are supported.
Current storage is a " + internalTbl.storage().getClass().getName();
- futures[partId] = raftMgr.updateRaftGroup(
- partitionRaftGroupName(tblId, partId),
- newPartAssignment,
- // start new nodes, only if it is table creation
- // other cases will be covered by rebalance logic
- (oldPartAssignment.isEmpty()) ? newPartAssignment
: Collections.emptyList(),
- () -> new PartitionListener(tblId,
- new VersionedRowStore(((MvTableStorage)
internalTbl.storage()).getOrCreateMvPartition(partId),
- txManager)),
- () -> new RebalanceRaftGroupEventsListener(
- metaStorageMgr,
-
tablesCfg.tables().get(tablesById.get(tblId).name()),
- partitionRaftGroupName(tblId, partId),
- partId,
- busyLock,
- movePartition(() ->
internalTbl.partitionRaftGroupService(partId)),
- rebalanceScheduler),
- groupOptionsForInternalTable(internalTbl)
+ // start new nodes, only if it is table creation
+ // other cases will be covered by rebalance logic
+ List<ClusterNode> nodes = (oldPartAssignment.isEmpty()) ?
newPartAssignment : Collections.emptyList();
+
+ String grpId = partitionRaftGroupName(tblId, partId);
+
+ if (raftMgr.shouldHaveRaftGroupLocally(nodes)) {
+ MvPartitionStorage partitionStorage =
internalTbl.storage().getOrCreateMvPartition(partId);
+
+ RaftGroupOptions groupOptions =
groupOptionsForPartition(internalTbl, partitionStorage, newPartAssignment);
+
+ raftMgr.startRaftGroupNode(
+ grpId,
+ newPartAssignment,
+ new PartitionListener(tblId,
+ new VersionedRowStore(partitionStorage,
+ txManager)),
+ new RebalanceRaftGroupEventsListener(
+ metaStorageMgr,
+
tablesCfg.tables().get(tablesById.get(tblId).name()),
+ grpId,
+ partId,
+ busyLock,
+ movePartition(() ->
internalTbl.partitionRaftGroupService(partId)),
+ rebalanceScheduler),
+ groupOptions
Review Comment:
```suggestion
rebalanceScheduler
),
groupOptions
```
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/UuidRowId.java:
##########
@@ -25,6 +25,7 @@
/**
* UUID-based ignite row id implementation.
*/
+@Deprecated
Review Comment:
Why? What to use instead?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/InitPartitionSnapshotReader.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot;
+
+import java.io.IOException;
+import java.util.Set;
+import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
+import org.apache.ignite.raft.jraft.rpc.Message;
+import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
+
+/**
+ * Snapshot reader used for raft group bootstrap. Reads initial state of the
storage.
+ */
+class InitPartitionSnapshotReader extends SnapshotReader {
+ /** Instance of snapshot storage for shared fields access. */
+ private final PartitionSnapshotStorage snapshotStorage;
+
+ /**
+ * Constructor.
+ *
+ * @param snapshotStorage Snapshot storage.
+ */
+ public InitPartitionSnapshotReader(PartitionSnapshotStorage
snapshotStorage) {
+ this.snapshotStorage = snapshotStorage;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public SnapshotMeta load() {
+ return snapshotStorage.snapshotMeta;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String getPath() {
+ return snapshotStorage.snapshotUri;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Set<String> listFiles() {
+ // No files in the snapshot.
+ return Set.of();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Message getFileMeta(String fileName) {
Review Comment:
```suggestion
public @Nullable Message getFileMeta(String fileName) {
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotWriter.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot;
+
+import java.util.Set;
+import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
+import org.apache.ignite.raft.jraft.rpc.Message;
+import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
+
+/**
+ * Snapshot writer used for RAFT log truncation.
+ */
+class PartitionSnapshotWriter extends SnapshotWriter {
+ /** Instance of snapshot storage for shared fields access. */
+ private final PartitionSnapshotStorage snapshotStorage;
+
+ /**
+ * Constructor.
+ *
+ * @param snapshotStorage Snapshot storage.
+ */
+ public PartitionSnapshotWriter(PartitionSnapshotStorage snapshotStorage) {
+ this.snapshotStorage = snapshotStorage;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean init(Void opts) {
+ // No-op.
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String getPath() {
+ return snapshotStorage.snapshotUri;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Set<String> listFiles() {
+ // No files in the snapshot.
+ return Set.of();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Message getFileMeta(String fileName) {
Review Comment:
```suggestion
public @Nullable Message getFileMeta(String fileName) {
```
##########
modules/table/src/test/java/org/apache/ignite/internal/table/InteropOperationsTest.java:
##########
@@ -118,12 +119,17 @@ public class InteropOperationsTest {
TxManager txManager = new TxManagerImpl(clusterService, new
HeapLockManager());
txManager.start();
- INT_TABLE = new DummyInternalTableImpl(new VersionedRowStore(new
TestMvPartitionStorage(List.of(), 0), txManager), txManager);
+ AtomicLong raftIndex = new AtomicLong();
+
+ INT_TABLE = new DummyInternalTableImpl(
+ new VersionedRowStore(new TestMvPartitionStorage(List.of(),
0), txManager),
+ txManager,
+ raftIndex);
Review Comment:
```suggestion
raftIndex
);
```
##########
modules/table/src/test/java/org/apache/ignite/internal/table/MessagingServiceTestUtils.java:
##########
@@ -88,13 +93,23 @@ public static MessagingService mockMessagingService(
return messagingService;
}
- private static <T extends Command> Iterator<CommandClosure<T>> iterator(T
obj) {
+ private static <T extends Command> Iterator<CommandClosure<T>> iterator(T
obj, AtomicLong raftIndex) {
+ long index = raftIndex.incrementAndGet();
Review Comment:
```suggestion
long appliedIndex = raftIndex.incrementAndGet();
```
##########
modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java:
##########
@@ -81,6 +82,8 @@ public DummyInternalTableImpl(VersionedRowStore store,
TxManager txManager) {
invocationClose -> {
Command cmd = invocationClose.getArgument(0);
+ long index = raftIndex.incrementAndGet();
Review Comment:
```suggestion
long appliedIndex = raftIndex.incrementAndGet();
```
##########
modules/table/src/test/java/org/apache/ignite/internal/table/RecordBinaryViewOperationsTest.java:
##########
@@ -694,13 +695,16 @@ private TableImpl createTableImpl(SchemaDescriptor
schema) {
TxManager txManager = new TxManagerImpl(clusterService, lockManager);
+ AtomicLong raftIndex = new AtomicLong();
+
DummyInternalTableImpl table = new DummyInternalTableImpl(
new VersionedRowStore(new TestMvPartitionStorage(List.of(),
0), txManager),
- txManager);
+ txManager,
+ raftIndex);
Review Comment:
```suggestion
raftIndex
);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]