tkalkirill commented on code in PR #937:
URL: https://github.com/apache/ignite-3/pull/937#discussion_r926419883


##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java:
##########
@@ -26,6 +26,7 @@
 /**
  * Table storage that contains meta, partitions and SQL indexes.
  */
+@Deprecated

Review Comment:
   Probably it is necessary to specify what to use.



##########
modules/raft/src/test/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java:
##########
@@ -232,17 +235,28 @@ private <T extends Command> void handleActionRequest(
             BlockingQueue<CommandClosureEx<T>> queue,
             RaftGroupListener lsnr
     ) {
+        long index = req.command() instanceof ReadCommand ? 0 : 
appliedIndex.incrementAndGet();

Review Comment:
   ```suggestion
           long appliedIndex = req.command() instanceof ReadCommand ? 0 : 
appliedIndex.incrementAndGet();
   ```



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java:
##########
@@ -499,14 +499,25 @@ public boolean hasNext() {
                     public CommandClosure<WriteCommand> next() {
                         @Nullable CommandClosure<WriteCommand> done = 
(CommandClosure<WriteCommand>) iter.done();
                         ByteBuffer data = iter.getData();
-                        WriteCommand command = 
JDKMarshaller.DEFAULT.unmarshall(data.array());
+
+                        WriteCommand command = done == null ? 
JDKMarshaller.DEFAULT.unmarshall(data.array()) : done.command();
+
+                        long index = iter.getIndex();

Review Comment:
   ```suggestion
                           long appliedIndex = iter.getIndex();
   ```



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java:
##########
@@ -71,6 +72,14 @@ public class PageMemoryMvPartitionStorage implements 
MvPartitionStorage {
             ScanVersionChainByTimestamp::new
     );
 
+    /**
+     * Applied index value.
+     *
+     * @deprecated Not persistent, should be fixed later. TODO IGNITE-17077
+     */
+    @Deprecated
+    private long appliedIndex = 0;

Review Comment:
   Why not **volatile**?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -485,24 +486,38 @@ private void 
updateAssignmentInternal(ConfigurationNotificationEvent<byte[]> ass
                     assert internalTbl.storage() instanceof MvTableStorage :
                             "Only multi version storages are supported. 
Current storage is a " + internalTbl.storage().getClass().getName();
 
-                    futures[partId] = raftMgr.updateRaftGroup(
-                            partitionRaftGroupName(tblId, partId),
-                            newPartAssignment,
-                            // start new nodes, only if it is table creation
-                            // other cases will be covered by rebalance logic
-                            (oldPartAssignment.isEmpty()) ? newPartAssignment 
: Collections.emptyList(),
-                            () -> new PartitionListener(tblId,
-                                    new VersionedRowStore(((MvTableStorage) 
internalTbl.storage()).getOrCreateMvPartition(partId),
-                                            txManager)),
-                            () -> new RebalanceRaftGroupEventsListener(
-                                    metaStorageMgr,
-                                    
tablesCfg.tables().get(tablesById.get(tblId).name()),
-                                    partitionRaftGroupName(tblId, partId),
-                                    partId,
-                                    busyLock,
-                                    movePartition(() -> 
internalTbl.partitionRaftGroupService(partId)),
-                                    rebalanceScheduler),
-                            groupOptionsForInternalTable(internalTbl)
+                    // start new nodes, only if it is table creation
+                    // other cases will be covered by rebalance logic
+                    List<ClusterNode> nodes = (oldPartAssignment.isEmpty()) ? 
newPartAssignment : Collections.emptyList();
+
+                    String grpId = partitionRaftGroupName(tblId, partId);
+
+                    if (raftMgr.shouldHaveRaftGroupLocally(nodes)) {
+                        MvPartitionStorage partitionStorage = 
internalTbl.storage().getOrCreateMvPartition(partId);
+
+                        RaftGroupOptions groupOptions = 
groupOptionsForPartition(internalTbl, partitionStorage, newPartAssignment);
+
+                        raftMgr.startRaftGroupNode(
+                                grpId,
+                                newPartAssignment,
+                                new PartitionListener(tblId,
+                                        new VersionedRowStore(partitionStorage,
+                                                txManager)),

Review Comment:
   ```suggestion
                                           new 
VersionedRowStore(partitionStorage, txManager)),
   ```



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java:
##########
@@ -268,16 +268,22 @@ public RocksDbMvPartitionStorage 
getOrCreateMvPartition(int partitionId) throws
     @Override
     public RocksDbMvPartitionStorage getMvPartition(int partitionId) {
         if (getPartition(partitionId) == null) {
-            throw new NullPointerException("Partition doesn't exist");
+            return null;
         }
 
-        return new RocksDbMvPartitionStorage(partitionId, db, 
partitionCf.handle());
+        return new RocksDbMvPartitionStorage(partitionId, db, 
partitionCf.handle(), meta.columnFamily().handle());
     }
 
     @Override

Review Comment:
   ```suggestion
       /** {@inheritDoc} */
       @Override
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1325,14 +1341,33 @@ public boolean onUpdate(@NotNull WatchEvent evt) {
                                         + " [key={}, partition={}, table={}, 
localMemberAddress={}]",
                                 pendingAssignmentsWatchEvent.key(), part, 
tbl.name(), localMember.address());
 
-                        raftMgr.startRaftGroupNode(
-                                partId,
-                                assignments,
-                                deltaPeers,
-                                raftGrpLsnrSupplier,
-                                raftGrpEvtsLsnrSupplier,
-                                
groupOptionsForInternalTable(tbl.internalTable())
-                        );
+                        if (raftMgr.shouldHaveRaftGroupLocally(deltaPeers)) {
+                            MvPartitionStorage partitionStorage = 
tbl.internalTable().storage().getOrCreateMvPartition(part);
+
+                            RaftGroupOptions groupOptions = 
groupOptionsForPartition(tbl.internalTable(), partitionStorage, assignments);
+
+                            RaftGroupListener raftGrpLsnr = new 
PartitionListener(
+                                    tblId,
+                                    new VersionedRowStore(partitionStorage, 
txManager)
+                            );
+
+                            RaftGroupEventsListener raftGrpEvtsLsnr = new 
RebalanceRaftGroupEventsListener(
+                                    metaStorageMgr,
+                                    tblCfg,
+                                    partId,
+                                    part,
+                                    busyLock,
+                                    movePartition(() -> 
tbl.internalTable().partitionRaftGroupService(part)),
+                                    rebalanceScheduler);

Review Comment:
   ```suggestion
                                       movePartition(() -> 
tbl.internalTable().partitionRaftGroupService(part)),
                                       rebalanceScheduler
                             );
   ```



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java:
##########
@@ -268,16 +268,22 @@ public RocksDbMvPartitionStorage 
getOrCreateMvPartition(int partitionId) throws
     @Override
     public RocksDbMvPartitionStorage getMvPartition(int partitionId) {

Review Comment:
   ```suggestion
       public @Nullable RocksDbMvPartitionStorage getMvPartition(int 
partitionId) {
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java:
##########
@@ -69,6 +72,22 @@ public class VersionedRowStore {
     public VersionedRowStore(@NotNull MvPartitionStorage storage, @NotNull 
TxManager txManager) {
         this.storage = Objects.requireNonNull(storage);
         this.txManager = Objects.requireNonNull(txManager);
+
+        Set<RowId> ids = new HashSet<>();
+
+        storage.forEach((rowId, binaryRow) -> {
+            if (ids.add(rowId)) {
+                primaryIndex.put(binaryRow.keySlice(), rowId);
+            }
+        });
+    }
+
+    public void appliedIndex(long appliedIndex) {
+        storage.appliedIndex(appliedIndex);
+    }
+
+    public long appliedIndex() {

Review Comment:
   Please add javadoc



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java:
##########
@@ -69,6 +72,22 @@ public class VersionedRowStore {
     public VersionedRowStore(@NotNull MvPartitionStorage storage, @NotNull 
TxManager txManager) {
         this.storage = Objects.requireNonNull(storage);
         this.txManager = Objects.requireNonNull(txManager);
+
+        Set<RowId> ids = new HashSet<>();
+
+        storage.forEach((rowId, binaryRow) -> {
+            if (ids.add(rowId)) {
+                primaryIndex.put(binaryRow.keySlice(), rowId);
+            }
+        });
+    }
+
+    public void appliedIndex(long appliedIndex) {

Review Comment:
   Please add javadoc



##########
modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java:
##########
@@ -463,13 +464,16 @@ private TableImpl createTable(SchemaDescriptor schema) {
 
         TxManager txManager = new TxManagerImpl(clusterService, lockManager);
 
+        AtomicLong raftIndex = new AtomicLong();
+
         DummyInternalTableImpl table = new DummyInternalTableImpl(
                 new VersionedRowStore(new TestMvPartitionStorage(List.of(), 
0), txManager),
-                txManager);
+                txManager,
+                raftIndex);

Review Comment:
   ```suggestion
                   raftIndex
                   );
   ```



##########
modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsSimpleSchemaTest.java:
##########
@@ -718,13 +719,16 @@ private TableImpl createTable(SchemaDescriptor schema) {
 
         TxManager txManager = new TxManagerImpl(clusterService, lockManager);
 
+        AtomicLong raftIndex = new AtomicLong();
+
         DummyInternalTableImpl table = new DummyInternalTableImpl(
                 new VersionedRowStore(new TestMvPartitionStorage(List.of(), 
0), txManager),
-                txManager);
+                txManager,
+                raftIndex);

Review Comment:
   ```suggestion
                   raftIndex
                   );
   ```



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java:
##########
@@ -57,6 +57,13 @@ class RocksDbMetaStorage {
         this.metaCf = metaCf;
     }
 
+    /**
+     * Returns a column family instance, associated with the meta storage.
+     */
+    ColumnFamily columnFamily() {
+        return metaCf;

Review Comment:
   ```suggestion
           return metaColumnFamily;
   ```



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java:
##########
@@ -71,6 +72,14 @@ public class PageMemoryMvPartitionStorage implements 
MvPartitionStorage {
             ScanVersionChainByTimestamp::new
     );
 
+    /**
+     * Applied index value.
+     *
+     * @deprecated Not persistent, should be fixed later. TODO IGNITE-17077
+     */
+    @Deprecated

Review Comment:
   Why not just **TODO** there?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -485,24 +486,38 @@ private void 
updateAssignmentInternal(ConfigurationNotificationEvent<byte[]> ass
                     assert internalTbl.storage() instanceof MvTableStorage :
                             "Only multi version storages are supported. 
Current storage is a " + internalTbl.storage().getClass().getName();
 
-                    futures[partId] = raftMgr.updateRaftGroup(
-                            partitionRaftGroupName(tblId, partId),
-                            newPartAssignment,
-                            // start new nodes, only if it is table creation
-                            // other cases will be covered by rebalance logic
-                            (oldPartAssignment.isEmpty()) ? newPartAssignment 
: Collections.emptyList(),
-                            () -> new PartitionListener(tblId,
-                                    new VersionedRowStore(((MvTableStorage) 
internalTbl.storage()).getOrCreateMvPartition(partId),
-                                            txManager)),
-                            () -> new RebalanceRaftGroupEventsListener(
-                                    metaStorageMgr,
-                                    
tablesCfg.tables().get(tablesById.get(tblId).name()),
-                                    partitionRaftGroupName(tblId, partId),
-                                    partId,
-                                    busyLock,
-                                    movePartition(() -> 
internalTbl.partitionRaftGroupService(partId)),
-                                    rebalanceScheduler),
-                            groupOptionsForInternalTable(internalTbl)
+                    // start new nodes, only if it is table creation
+                    // other cases will be covered by rebalance logic
+                    List<ClusterNode> nodes = (oldPartAssignment.isEmpty()) ? 
newPartAssignment : Collections.emptyList();
+
+                    String grpId = partitionRaftGroupName(tblId, partId);
+
+                    if (raftMgr.shouldHaveRaftGroupLocally(nodes)) {
+                        MvPartitionStorage partitionStorage = 
internalTbl.storage().getOrCreateMvPartition(partId);
+
+                        RaftGroupOptions groupOptions = 
groupOptionsForPartition(internalTbl, partitionStorage, newPartAssignment);
+
+                        raftMgr.startRaftGroupNode(
+                                grpId,
+                                newPartAssignment,
+                                new PartitionListener(tblId,
+                                        new VersionedRowStore(partitionStorage,
+                                                txManager)),
+                                new RebalanceRaftGroupEventsListener(
+                                        metaStorageMgr,
+                                        
tablesCfg.tables().get(tablesById.get(tblId).name()),
+                                        grpId,
+                                        partId,
+                                        busyLock,
+                                        movePartition(() -> 
internalTbl.partitionRaftGroupService(partId)),
+                                        rebalanceScheduler),
+                                groupOptions

Review Comment:
   ```suggestion
                                           rebalanceScheduler
                                    ),
                                   groupOptions
   ```



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/UuidRowId.java:
##########
@@ -25,6 +25,7 @@
 /**
  * UUID-based ignite row id implementation.
  */
+@Deprecated

Review Comment:
   Why? What to use instead?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/InitPartitionSnapshotReader.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot;
+
+import java.io.IOException;
+import java.util.Set;
+import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
+import org.apache.ignite.raft.jraft.rpc.Message;
+import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
+
+/**
+ * Snapshot reader used for raft group bootstrap. Reads initial state of the 
storage.
+ */
+class InitPartitionSnapshotReader extends SnapshotReader {
+    /** Instance of snapshot storage for shared fields access. */
+    private final PartitionSnapshotStorage snapshotStorage;
+
+    /**
+     * Constructor.
+     *
+     * @param snapshotStorage Snapshot storage.
+     */
+    public InitPartitionSnapshotReader(PartitionSnapshotStorage 
snapshotStorage) {
+        this.snapshotStorage = snapshotStorage;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public SnapshotMeta load() {
+        return snapshotStorage.snapshotMeta;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String getPath() {
+        return snapshotStorage.snapshotUri;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Set<String> listFiles() {
+        // No files in the snapshot.
+        return Set.of();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Message getFileMeta(String fileName) {

Review Comment:
   ```suggestion
       public @Nullable Message getFileMeta(String fileName) {
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotWriter.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot;
+
+import java.util.Set;
+import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
+import org.apache.ignite.raft.jraft.rpc.Message;
+import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
+
+/**
+ * Snapshot writer used for RAFT log truncation.
+ */
+class PartitionSnapshotWriter extends SnapshotWriter {
+    /** Instance of snapshot storage for shared fields access. */
+    private final PartitionSnapshotStorage snapshotStorage;
+
+    /**
+     * Constructor.
+     *
+     * @param snapshotStorage Snapshot storage.
+     */
+    public PartitionSnapshotWriter(PartitionSnapshotStorage snapshotStorage) {
+        this.snapshotStorage = snapshotStorage;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean init(Void opts) {
+        // No-op.
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String getPath() {
+        return snapshotStorage.snapshotUri;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Set<String> listFiles() {
+        // No files in the snapshot.
+        return Set.of();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Message getFileMeta(String fileName) {

Review Comment:
   ```suggestion
       public @Nullable Message getFileMeta(String fileName) {
   ```



##########
modules/table/src/test/java/org/apache/ignite/internal/table/InteropOperationsTest.java:
##########
@@ -118,12 +119,17 @@ public class InteropOperationsTest {
         TxManager txManager = new TxManagerImpl(clusterService, new 
HeapLockManager());
         txManager.start();
 
-        INT_TABLE = new DummyInternalTableImpl(new VersionedRowStore(new 
TestMvPartitionStorage(List.of(), 0), txManager), txManager);
+        AtomicLong raftIndex = new AtomicLong();
+
+        INT_TABLE = new DummyInternalTableImpl(
+                new VersionedRowStore(new TestMvPartitionStorage(List.of(), 
0), txManager),
+                txManager,
+                raftIndex);

Review Comment:
   ```suggestion
                   raftIndex
                   );
   ```



##########
modules/table/src/test/java/org/apache/ignite/internal/table/MessagingServiceTestUtils.java:
##########
@@ -88,13 +93,23 @@ public static MessagingService mockMessagingService(
         return messagingService;
     }
 
-    private static <T extends Command> Iterator<CommandClosure<T>> iterator(T 
obj) {
+    private static <T extends Command> Iterator<CommandClosure<T>> iterator(T 
obj, AtomicLong raftIndex) {
+        long index = raftIndex.incrementAndGet();

Review Comment:
   ```suggestion
           long appliedIndex = raftIndex.incrementAndGet();
   ```



##########
modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java:
##########
@@ -81,6 +82,8 @@ public DummyInternalTableImpl(VersionedRowStore store, 
TxManager txManager) {
                 invocationClose -> {
                     Command cmd = invocationClose.getArgument(0);
 
+                    long index = raftIndex.incrementAndGet();

Review Comment:
   ```suggestion
                       long appliedIndex = raftIndex.incrementAndGet();
   ```



##########
modules/table/src/test/java/org/apache/ignite/internal/table/RecordBinaryViewOperationsTest.java:
##########
@@ -694,13 +695,16 @@ private TableImpl createTableImpl(SchemaDescriptor 
schema) {
 
         TxManager txManager = new TxManagerImpl(clusterService, lockManager);
 
+        AtomicLong raftIndex = new AtomicLong();
+
         DummyInternalTableImpl table = new DummyInternalTableImpl(
                 new VersionedRowStore(new TestMvPartitionStorage(List.of(), 
0), txManager),
-                txManager);
+                txManager,
+                raftIndex);

Review Comment:
   ```suggestion
                   raftIndex
                   );
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to