tkalkirill commented on code in PR #937:
URL: https://github.com/apache/ignite-3/pull/937#discussion_r920785270


##########
modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/CommandClosure.java:
##########
@@ -28,6 +28,14 @@
  * @see RaftGroupListener
  */
 public interface CommandClosure<R extends Command> {
+    /**
+     * Corresponding log index of the command. Present for write commands only.
+     * Returns {@code 0} for read commands.
+     */
+    default long index() {

Review Comment:
   What about the name **appliedIndex**?



##########
modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java:
##########
@@ -63,7 +63,7 @@ public FakeInternalTable(String tableName, UUID tableId) {
 
     /** {@inheritDoc} */
     @Override
-    public @NotNull TableStorage storage() {
+    public @NotNull MvTableStorage storage() {

Review Comment:
   ```suggestion
       public MvTableStorage storage() {
   ```



##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -30,6 +31,21 @@
  * <p>Each MvPartitionStorage instance represents exactly one partition.
  */
 public interface MvPartitionStorage extends AutoCloseable {
+    /**
+     * Last known replicator index. {@code 0} if index is unknown.
+     */
+    long appliedIndex();
+
+    /**
+     * Sets the last known replicator index.
+     */
+    void appliedIndex(long appliedIndex) throws StorageException;
+
+    /**
+     * {@link #appliedIndex()} value consistent with the data, already 
persisted on the storage.
+     */
+    long persistedIndex();

Review Comment:
   What about the name **persistedAppliedIndex**?



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java:
##########
@@ -489,9 +491,17 @@ public boolean hasNext() {
                     public CommandClosure<WriteCommand> next() {
                         @Nullable CommandClosure<WriteCommand> done = 
(CommandClosure<WriteCommand>) iter.done();
                         ByteBuffer data = iter.getData();
-                        WriteCommand command = 
JDKMarshaller.DEFAULT.unmarshall(data.array());
+
+                        WriteCommand command = done == null ? 
JDKMarshaller.DEFAULT.unmarshall(data.array()) : done.command();
+
+                        long index = iter.getIndex();
 
                         return new CommandClosure<>() {
+                            @Override
+                            public long index() {
+                                return index;
+                            }

Review Comment:
   ```suggestion
                           return new CommandClosure<>() {
                                /** {@inheritDoc} */
                               @Override
                               public long index() {
                                   return index;
                               }
   ```



##########
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorage.java:
##########
@@ -74,6 +78,23 @@ public boolean notContainsWriteIntent() {
         }
     }
 
+    @Override

Review Comment:
   ```suggestion
        /** {@inheritDoc} */
       @Override
   ```



##########
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestMvTableStorage.java:
##########
@@ -65,6 +64,11 @@ public CompletableFuture<?> destroyPartition(int 
partitionId) throws StorageExce
         return CompletableFuture.completedFuture(null);
     }
 
+    @Override

Review Comment:
   ```suggestion
       /** {@inheritDoc} */
       @Override
   ```



##########
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorage.java:
##########
@@ -74,6 +78,23 @@ public boolean notContainsWriteIntent() {
         }
     }
 
+    @Override
+    public long appliedIndex() {
+        return appliedIndex;
+    }
+
+    @Override
+    public void appliedIndex(long appliedIndex) throws StorageException {
+        assert appliedIndex > this.appliedIndex;
+
+        this.appliedIndex = appliedIndex;
+    }
+
+    @Override

Review Comment:
   ```suggestion
       /** {@inheritDoc} */
       @Override
   ```



##########
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorage.java:
##########
@@ -74,6 +78,23 @@ public boolean notContainsWriteIntent() {
         }
     }
 
+    @Override
+    public long appliedIndex() {
+        return appliedIndex;
+    }
+
+    @Override

Review Comment:
   ```suggestion
       /** {@inheritDoc} */
       @Override
   ```



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java:
##########
@@ -123,6 +132,21 @@ private VersionChainTree createVersionChainTree(
         );
     }
 
+    @Override
+    public long appliedIndex() {
+        return appliedIndex;
+    }
+
+    @Override

Review Comment:
   ```suggestion
       /** {@inheritDoc} */
       @Override
   ```



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java:
##########
@@ -123,6 +132,21 @@ private VersionChainTree createVersionChainTree(
         );
     }
 
+    @Override
+    public long appliedIndex() {
+        return appliedIndex;
+    }
+
+    @Override
+    public void appliedIndex(long appliedIndex) throws StorageException {
+        this.appliedIndex = appliedIndex;
+    }
+
+    @Override

Review Comment:
   ```suggestion
       /** {@inheritDoc} */
       @Override
   ```



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java:
##########
@@ -123,6 +132,21 @@ private VersionChainTree createVersionChainTree(
         );
     }
 
+    @Override

Review Comment:
   ```suggestion
        /** {@inheritDoc} */
       @Override
   ```



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java:
##########
@@ -467,6 +491,21 @@ private Cursor<BinaryRow> 
internalScan(Predicate<BinaryRow> keyFilter, @Nullable
         return new ScanCursor(treeCursor, keyFilter, transactionId, timestamp);
     }
 
+    @Override

Review Comment:
   ```suggestion
       /** {@inheritDoc} */
       @Override
   ```



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -95,30 +98,73 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
     /** Partitions column family. */
     private final ColumnFamilyHandle cf;
 
+    /** Meta column family. */
+    private final ColumnFamilyHandle meta;
+
     /** Write options. */
     private final WriteOptions writeOpts = new WriteOptions();
 
     /** Upper bound for scans and reads. */
     private final Slice upperBound;
 
+    /** Key to store applied index value in meta. */
+    private byte[] appliedIndexKey;
+
     /**
      * Constructor.
      *
      * @param partitionId Partition id.
      * @param db RocksDB instance.
      * @param cf Column family handle to store partition data.
+     * @param meta Column family handle to store partition metadata.
      */
-    public RocksDbMvPartitionStorage(int partitionId, RocksDB db, 
ColumnFamilyHandle cf) {
+    public RocksDbMvPartitionStorage(int partitionId, RocksDB db, 
ColumnFamilyHandle cf, ColumnFamilyHandle meta) {
         this.partitionId = partitionId;
         this.db = db;
         this.cf = cf;
+        this.meta = meta;
 
         heapKeyBuffer = withInitial(() ->
                 ByteBuffer.allocate(MAX_KEY_SIZE)
                         .order(BIG_ENDIAN)
         );
 
         upperBound = new Slice(partitionEndPrefix());
+
+        appliedIndexKey = ("index" + 
partitionId).getBytes(StandardCharsets.UTF_8);
+    }
+
+    @Override
+    public long appliedIndex() {
+        byte[] appliedIndexBytes;
+
+        try {
+
+            appliedIndexBytes = db.get(meta, appliedIndexKey);
+
+        } catch (RocksDBException e) {
+            throw new StorageException(e);
+        }
+
+        if (appliedIndexBytes == null) {
+            return 0;
+        }
+
+        return ByteUtils.bytesToLong(appliedIndexBytes);
+    }
+
+    @Override
+    public void appliedIndex(long appliedIndex) throws StorageException {
+        try {
+            db.put(meta, appliedIndexKey, ByteUtils.longToBytes(appliedIndex));
+        } catch (RocksDBException e) {
+            throw new StorageException(e);
+        }
+    }
+
+    @Override

Review Comment:
   ```suggestion
       /** {@inheritDoc} */
       @Override
   ```



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -95,30 +98,73 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
     /** Partitions column family. */
     private final ColumnFamilyHandle cf;
 
+    /** Meta column family. */
+    private final ColumnFamilyHandle meta;
+
     /** Write options. */
     private final WriteOptions writeOpts = new WriteOptions();
 
     /** Upper bound for scans and reads. */
     private final Slice upperBound;
 
+    /** Key to store applied index value in meta. */
+    private byte[] appliedIndexKey;
+
     /**
      * Constructor.
      *
      * @param partitionId Partition id.
      * @param db RocksDB instance.
      * @param cf Column family handle to store partition data.
+     * @param meta Column family handle to store partition metadata.
      */
-    public RocksDbMvPartitionStorage(int partitionId, RocksDB db, 
ColumnFamilyHandle cf) {
+    public RocksDbMvPartitionStorage(int partitionId, RocksDB db, 
ColumnFamilyHandle cf, ColumnFamilyHandle meta) {
         this.partitionId = partitionId;
         this.db = db;
         this.cf = cf;
+        this.meta = meta;
 
         heapKeyBuffer = withInitial(() ->
                 ByteBuffer.allocate(MAX_KEY_SIZE)
                         .order(BIG_ENDIAN)
         );
 
         upperBound = new Slice(partitionEndPrefix());
+
+        appliedIndexKey = ("index" + 
partitionId).getBytes(StandardCharsets.UTF_8);
+    }
+
+    @Override

Review Comment:
   ```suggestion
       /** {@inheritDoc} */
       @Override
   ```



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -95,30 +98,73 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
     /** Partitions column family. */
     private final ColumnFamilyHandle cf;
 
+    /** Meta column family. */
+    private final ColumnFamilyHandle meta;
+
     /** Write options. */
     private final WriteOptions writeOpts = new WriteOptions();
 
     /** Upper bound for scans and reads. */
     private final Slice upperBound;
 
+    /** Key to store applied index value in meta. */
+    private byte[] appliedIndexKey;
+
     /**
      * Constructor.
      *
      * @param partitionId Partition id.
      * @param db RocksDB instance.
      * @param cf Column family handle to store partition data.
+     * @param meta Column family handle to store partition metadata.
      */
-    public RocksDbMvPartitionStorage(int partitionId, RocksDB db, 
ColumnFamilyHandle cf) {
+    public RocksDbMvPartitionStorage(int partitionId, RocksDB db, 
ColumnFamilyHandle cf, ColumnFamilyHandle meta) {
         this.partitionId = partitionId;
         this.db = db;
         this.cf = cf;
+        this.meta = meta;
 
         heapKeyBuffer = withInitial(() ->
                 ByteBuffer.allocate(MAX_KEY_SIZE)
                         .order(BIG_ENDIAN)
         );
 
         upperBound = new Slice(partitionEndPrefix());
+
+        appliedIndexKey = ("index" + 
partitionId).getBytes(StandardCharsets.UTF_8);
+    }
+
+    @Override
+    public long appliedIndex() {
+        byte[] appliedIndexBytes;
+
+        try {
+
+            appliedIndexBytes = db.get(meta, appliedIndexKey);
+

Review Comment:
   ```suggestion
               appliedIndexBytes = db.get(meta, appliedIndexKey);
   ```



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -95,30 +98,73 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
     /** Partitions column family. */
     private final ColumnFamilyHandle cf;
 
+    /** Meta column family. */
+    private final ColumnFamilyHandle meta;
+
     /** Write options. */
     private final WriteOptions writeOpts = new WriteOptions();
 
     /** Upper bound for scans and reads. */
     private final Slice upperBound;
 
+    /** Key to store applied index value in meta. */
+    private byte[] appliedIndexKey;
+
     /**
      * Constructor.
      *
      * @param partitionId Partition id.
      * @param db RocksDB instance.
      * @param cf Column family handle to store partition data.
+     * @param meta Column family handle to store partition metadata.
      */
-    public RocksDbMvPartitionStorage(int partitionId, RocksDB db, 
ColumnFamilyHandle cf) {
+    public RocksDbMvPartitionStorage(int partitionId, RocksDB db, 
ColumnFamilyHandle cf, ColumnFamilyHandle meta) {
         this.partitionId = partitionId;
         this.db = db;
         this.cf = cf;
+        this.meta = meta;
 
         heapKeyBuffer = withInitial(() ->
                 ByteBuffer.allocate(MAX_KEY_SIZE)
                         .order(BIG_ENDIAN)
         );
 
         upperBound = new Slice(partitionEndPrefix());
+
+        appliedIndexKey = ("index" + 
partitionId).getBytes(StandardCharsets.UTF_8);
+    }
+
+    @Override
+    public long appliedIndex() {
+        byte[] appliedIndexBytes;
+
+        try {
+
+            appliedIndexBytes = db.get(meta, appliedIndexKey);
+
+        } catch (RocksDBException e) {
+            throw new StorageException(e);
+        }
+
+        if (appliedIndexBytes == null) {
+            return 0;
+        }
+
+        return ByteUtils.bytesToLong(appliedIndexBytes);
+    }
+
+    @Override

Review Comment:
   ```suggestion
       /** {@inheritDoc} */
       @Override
   ```



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -528,6 +574,67 @@ private void incrementRowId(ByteBuffer buf) {
         };
     }
 
+    // TODO IGNITE-16769 Implement correct PartitionStorage rows count 
calculation.
+    @Override
+    public long rowsCount() {
+        try (
+                var upperBound = new Slice(partitionEndPrefix());
+                var options = new 
ReadOptions().setIterateUpperBound(upperBound);
+                RocksIterator it = db.newIterator(cf, options)
+        ) {
+            it.seek(partitionStartPrefix());
+
+            long size = 0;
+
+            while (it.isValid()) {
+                ++size;
+                it.next();
+            }
+
+            return size;
+        }
+    }
+
+    @Override

Review Comment:
   ```suggestion
       /** {@inheritDoc} */
       @Override
   ```



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java:
##########
@@ -71,6 +72,14 @@ public class PageMemoryMvPartitionStorage implements 
MvPartitionStorage {
             ScanVersionChainByTimestamp::new
     );
 
+    /**
+     * Applied index value.
+     *
+     * @deprecated Not persistent, should be fixed later.

Review Comment:
   Later is when and where? add a ticket



##########
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorage.java:
##########
@@ -74,6 +78,23 @@ public boolean notContainsWriteIntent() {
         }
     }
 
+    @Override
+    public long appliedIndex() {
+        return appliedIndex;
+    }
+
+    @Override
+    public void appliedIndex(long appliedIndex) throws StorageException {
+        assert appliedIndex > this.appliedIndex;

Review Comment:
   ```suggestion
           assert appliedIndex > this.appliedIndex : "current=" + 
this.appliedIndex + ", new=" + appliedIndex;
   ```



##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -118,4 +134,25 @@ public interface MvPartitionStorage extends AutoCloseable {
      * @throws StorageException If failed to read data from the storage.
      */
     Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, Timestamp 
timestamp) throws StorageException;
+
+    /**
+     * Returns rows count belongs to current storage.
+     *
+     * @return Rows count.
+     * @throws StorageException If failed to obtain size.
+     * @deprecated It's not yet defined what a "count" is. This value is not 
easily defined for multiversioned storages.
+     *
+     */

Review Comment:
   ```suggestion
   
       /**
        * Returns rows count belongs to current storage.
        *
        * @return Rows count.
        * @throws StorageException If failed to obtain size.
        * @deprecated It's not yet defined what a "count" is. This value is not 
easily defined for multiversioned storages.
        */
   ```



##########
modules/table/src/test/java/org/apache/ignite/internal/table/MessagingServiceTestUtils.java:
##########
@@ -88,8 +93,15 @@ public static MessagingService mockMessagingService(
         return messagingService;
     }
 
-    private static <T extends Command> Iterator<CommandClosure<T>> iterator(T 
obj) {
+    private static <T extends Command> Iterator<CommandClosure<T>> iterator(T 
obj, AtomicLong raftIndex) {
+        long index = raftIndex.incrementAndGet();
+
         CommandClosure<T> closure = new CommandClosure<>() {
+            @Override

Review Comment:
   ```suggestion
           CommandClosure<T> closure = new CommandClosure<>() {
               /** {@inheritDoc} */
               @Override
   ```



##########
modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java:
##########
@@ -109,6 +112,11 @@ public void result(@Nullable Serializable r) {
                                     }
                                 } else {
                                     CommandClosure<WriteCommand> clo = new 
CommandClosure<>() {
+                                        @Override
+                                        public long index() {

Review Comment:
   ```suggestion
                                           /** {@inheritDoc} */
                                           @Override
                                           public long index() {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to