vldpyatkov commented on code in PR #1230:
URL: https://github.com/apache/ignite-3/pull/1230#discussion_r1000410851


##########
modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java:
##########
@@ -210,9 +242,26 @@ public interface InternalTable extends AutoCloseable {
      * @param p  The partition.
      * @param tx The transaction.
      * @return {@link Publisher} that reactively notifies about partition rows.
+     * @throws IllegalArgumentException If proposed partition index {@code p} 
is out of bounds.
      */
     Publisher<BinaryRow> scan(int p, @Nullable InternalTransaction tx);
 
+    /**
+     * Scans given partition within the context of a read-only transaction, 
providing {@link Publisher} that reactively notifies about
+     * partition rows.
+     *
+     * @param p  The partition.
+     * @param tx The transaction.
+     * @return {@link Publisher} that reactively notifies about partition rows.
+     * @throws IllegalArgumentException If proposed partition index {@code p} 
is out of bounds.
+     * @throws TransactionException If proposed {@code tx} is read-write. 
Transaction itself won't be automatically rolled back.
+     */
+    Publisher<BinaryRow> scan(
+            int p,
+            @Nullable InternalTransaction tx,
+            @NotNull ClusterNode recipientNode

Review Comment:
   Add the argument to javadoc.



##########
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.distributed;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.schema.row.RowAssembler;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.PartitionTimestampCursor;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.tx.TransactionException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.function.Executable;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Tests for {@link InternalTable} read-only operations.
+ */
+@ExtendWith(MockitoExtension.class)
+public class ItInternalTableReadOnlyOperationsTest {

Review Comment:
   I think, we should inherit test class at least from BaseIgniteAbstractTest



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -337,33 +339,59 @@ private CompletableFuture<Object> 
processReadOnlyScanRetrieveBatchAction(ReadOnl
     private CompletableFuture<Object> 
processReadOnlySingleEntryAction(ReadOnlySingleRowReplicaRequest request) {
         ByteBuffer searchKey = request.binaryRow().keySlice();
 
-        UUID indexId = indexIdOrDefault(indexScanId/*request.indexToUse()*/);
-
         if (request.requestType() != RequestType.RO_GET) {
             throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
                     IgniteStringFormatter.format("Unknown single request 
[actionType={}]", request.requestType()));
         }
 
         //TODO: IGNITE-17868 Integrate indexes into rowIds resolution along 
with proper lock management on search rows.
-        RowId rowId = rowIdByKey(indexId, searchKey);
+        HybridTimestamp readTimestamp = request.readTimestamp();
+
+        try (PartitionTimestampCursor scan = 
mvDataStorage.scan(readTimestamp)) {
+            while (scan.hasNext()) {
+                ReadResult readResult = scan.next();
+                if (readResult.binaryRow() == null) {
+                    HybridTimestamp newestCommitTimestamp = 
readResult.newestCommitTimestamp();
+                    if (newestCommitTimestamp == null) {
+                        throw new AssertionError("Unexpected null value of the 
newest committed timestamp.");
+                    }
 
-        ReadResult readResult = rowId == null ? null : 
mvDataStorage.read(rowId, request.timestamp());
+                    BinaryRow candidate = 
scan.committed(readResult.newestCommitTimestamp());

Review Comment:
   Try to use readTimestamp here, possibly it will make the code simpler.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -376,34 +404,61 @@ private CompletableFuture<Object> 
processReadOnlyMultiEntryAction(ReadOnlyMultiR
         Collection<ByteBuffer> keyRows = request.binaryRows().stream().map(br 
-> br.keySlice()).collect(
                 Collectors.toList());
 
-        UUID indexId = indexIdOrDefault(indexScanId/*request.indexToUse()*/);
-
-        if (request.requestType() != RequestType.RO_GET_ALL) {
+        if (request.requestType() !=  RequestType.RO_GET_ALL) {
             throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
                     IgniteStringFormatter.format("Unknown single request 
[actionType={}]", request.requestType()));
         }
 
         ArrayList<BinaryRow> result = new ArrayList<>(keyRows.size());
 
-        for (ByteBuffer searchKey : keyRows) {
-            //TODO: IGNITE-17868 Integrate indexes into rowIds resolution 
along with proper lock management on search rows.
-            RowId rowId = rowIdByKey(indexId, searchKey);
-
-            ReadResult readResult = rowId == null ? null : 
mvDataStorage.read(rowId, request.timestamp());
+        //TODO: IGNITE-17868 Integrate indexes into rowIds resolution along 
with proper lock management on search rows.
+        HybridTimestamp readTimestamp = request.readTimestamp();
 
-            result.add(readResult == null ? null : 
resolveReadResult(readResult, request.timestamp(), () -> {
-                if (readResult.newestCommitTimestamp() == null) {
-                    return null;
-                }
+        try (PartitionTimestampCursor scan = 
mvDataStorage.scan(readTimestamp)) {
+            while (scan.hasNext()) {
+                ReadResult readResult = scan.next();
 
-                ReadResult committedReadResult = mvDataStorage.read(rowId, 
readResult.newestCommitTimestamp());
+                for (ByteBuffer searchKey : keyRows) {
+                    if (readResult.binaryRow() == null) {
+                        HybridTimestamp newestCommitTimestamp = 
readResult.newestCommitTimestamp();
+                        if (newestCommitTimestamp == null) {
+                            throw new AssertionError("Unexpected null value of 
the newest committed timestamp.");
+                        }
 
-                assert !committedReadResult.isWriteIntent() :
-                        "The result is not committed [rowId=" + rowId + ", 
timestamp="
-                                + readResult.newestCommitTimestamp() + ']';
+                        BinaryRow candidate = 
scan.committed(readResult.newestCommitTimestamp());

Review Comment:
   Try to use readTimestamp as for the single get operation.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java:
##########
@@ -77,6 +95,20 @@ public interface InternalTable extends AutoCloseable {
      */
     CompletableFuture<Collection<BinaryRow>> getAll(Collection<BinaryRowEx> 
keyRows, @Nullable InternalTransaction tx);
 
+    /**
+     * Asynchronously get rows from the table within the context of read-only 
transaction.
+     *
+     * @param keyRows Rows with key columns set.
+     * @param tx      The transaction.
+     * @return Future representing pending completion of the operation.
+     */
+    CompletableFuture<Collection<BinaryRow>> getAll(
+            Collection<BinaryRowEx> keyRows,
+            @Nullable InternalTransaction tx,
+            @NotNull ClusterNode recipientNode

Review Comment:
   Add to javadoc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to