sanpwc commented on a change in pull request #348:
URL: https://github.com/apache/ignite-3/pull/348#discussion_r718555919



##########
File path: 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -302,4 +338,151 @@ private int partId(BinaryRow row) {
                     return list;
                 });
     }
+
+    /** Partition scan publisher. */
+    private class PartitionScanPublisher implements Publisher<BinaryRow> {
+        /** {@link Publisher<BinaryRow>} that relatively notifies about 
partition rows.  */
+        private final RaftGroupService raftGrpSvc;
+
+        /** */
+        private AtomicBoolean wasSubscribed;
+
+        /**
+         * The constructor.
+         *
+         * @param raftGrpSvc {@link RaftGroupService} to run corresponding 
raft commands.
+         */
+        PartitionScanPublisher(RaftGroupService raftGrpSvc) {
+            this.raftGrpSvc = raftGrpSvc;
+            this.wasSubscribed = new AtomicBoolean(false);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void subscribe(Subscriber<? super BinaryRow> 
subscriber) {
+            if (subscriber == null)
+                throw new NullPointerException("Subscriber is null");
+
+            if (!wasSubscribed.compareAndSet(false, true))
+                subscriber.onError(new IllegalStateException("Scan publisher 
does not support multiple subscriptions."));
+
+            PartitionScanSubscription subscription = new 
PartitionScanSubscription(subscriber);
+
+            subscriber.onSubscribe(subscription);
+        }
+
+        /**
+         * Partition Scan Subscription.
+         */
+        private class PartitionScanSubscription implements Subscription {
+            /** */
+            private final Subscriber<? super BinaryRow> subscriber;
+
+            /** */
+            private final AtomicBoolean isCanceled;

Review comment:
       Fixed.

##########
File path: 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -302,4 +338,151 @@ private int partId(BinaryRow row) {
                     return list;
                 });
     }
+
+    /** Partition scan publisher. */
+    private class PartitionScanPublisher implements Publisher<BinaryRow> {
+        /** {@link Publisher<BinaryRow>} that relatively notifies about 
partition rows.  */
+        private final RaftGroupService raftGrpSvc;
+
+        /** */
+        private AtomicBoolean wasSubscribed;

Review comment:
       ok, fixed.

##########
File path: 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -302,4 +338,151 @@ private int partId(BinaryRow row) {
                     return list;
                 });
     }
+
+    /** Partition scan publisher. */
+    private class PartitionScanPublisher implements Publisher<BinaryRow> {
+        /** {@link Publisher<BinaryRow>} that relatively notifies about 
partition rows.  */
+        private final RaftGroupService raftGrpSvc;
+
+        /** */
+        private AtomicBoolean wasSubscribed;
+
+        /**
+         * The constructor.
+         *
+         * @param raftGrpSvc {@link RaftGroupService} to run corresponding 
raft commands.
+         */
+        PartitionScanPublisher(RaftGroupService raftGrpSvc) {
+            this.raftGrpSvc = raftGrpSvc;
+            this.wasSubscribed = new AtomicBoolean(false);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void subscribe(Subscriber<? super BinaryRow> 
subscriber) {
+            if (subscriber == null)

Review comment:
       Nope, it's the part Publisher contract of 
https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.Publisher.html
   ```
   Throws:
       NullPointerException - if subscriber is null
   ```

##########
File path: 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -44,16 +51,29 @@
 import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
 import 
org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
 import 
org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
+import 
org.apache.ignite.internal.table.distributed.command.scan.ScanCloseCommand;
+import 
org.apache.ignite.internal.table.distributed.command.scan.ScanInitCommand;
+import 
org.apache.ignite.internal.table.distributed.command.scan.ScanRetrieveBatchCommand;
+import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.lang.IgniteUuidGenerator;
+import org.apache.ignite.lang.LoggerMessageHelper;
 import org.apache.ignite.raft.client.service.RaftGroupService;
 import org.apache.ignite.schema.definition.SchemaManagementMode;
 import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Storage of table rows.
  */
 public class InternalTableImpl implements InternalTable {
+    /** Log. */
+    private static final IgniteLogger LOG = 
IgniteLogger.forClass(InternalTableImpl.class);
+
+    /** IgniteUuid generator. */
+    private final IgniteUuidGenerator UUID_GENERATOR = new 
IgniteUuidGenerator(UUID.randomUUID(), 0);

Review comment:
       Let it be static.

##########
File path: 
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITInternalTableScanTest.java
##########
@@ -0,0 +1,528 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.distributed;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.RaftServerImpl;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteUuidGenerator;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
+import org.apache.ignite.utils.ClusterServiceTestUtils;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link InternalTable#scan(int, org.apache.ignite.tx.Transaction)}
+ */
+@ExtendWith(MockitoExtension.class)
+public class ITInternalTableScanTest {
+    /** */
+    private static final ClusterServiceFactory NETWORK_FACTORY = new 
TestScaleCubeClusterServiceFactory();
+
+    /** */
+    private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = 
new MessageSerializationRegistryImpl();
+
+    /** */
+    private static final RaftMessagesFactory FACTORY = new 
RaftMessagesFactory();
+
+    /** */
+    private static final String TEST_TABLE_NAME = "testTbl";
+
+    /** Mock partition storage. */
+    @Mock
+    private Storage mockStorage;
+
+    /** */
+    private ClusterService network;
+
+    /** */
+    private RaftServer raftSrv;
+
+    /** Internal table to test. */
+    private InternalTable internalTbl;
+
+    /**
+     * Prepare test environment:
+     * <ol>
+     * <li>Start network node.</li>
+     * <li>Start raft server.</li>
+     * <li>Prepare partitioned raft group.</li>
+     * <li>Prepare partitioned raft group service.</li>
+     * <li>Prepare internal table as a test object.</li>
+     * </ol>
+     *
+     * @throws Exception If any.
+     */
+    @BeforeEach
+    public void setUp() throws Exception {
+        NetworkAddress nodeNetworkAddress = new NetworkAddress("localhost", 
20_000);
+
+        network = ClusterServiceTestUtils.clusterService(
+            "Node" + 20_000,
+            20_000,
+            new StaticNodeFinder(List.of(nodeNetworkAddress)),
+            SERIALIZATION_REGISTRY,
+            NETWORK_FACTORY
+        );
+
+        network.start();
+
+        raftSrv = new RaftServerImpl(network, FACTORY);
+
+        raftSrv.start();
+
+        String grpName = "test_part_grp";
+
+        List<Peer> conf = List.of(new Peer(nodeNetworkAddress));
+
+        mockStorage = mock(Storage.class);
+
+        raftSrv.startRaftGroup(
+            grpName,
+            new PartitionListener(mockStorage),
+            conf
+        );
+
+        RaftGroupService raftGrpSvc = RaftGroupServiceImpl.start(
+            grpName,
+            network,
+            FACTORY,
+            10_000,
+            conf,
+            true,
+            200
+        ).get(3, TimeUnit.SECONDS);
+
+        internalTbl = new InternalTableImpl(
+            TEST_TABLE_NAME,
+            new IgniteUuidGenerator(UUID.randomUUID(), 0).randomUuid(),
+            Map.of(0, raftGrpSvc),
+            1
+        );
+    }
+
+    /**
+     * Cleanup previously started network and raft server.
+     *
+     * @throws Exception If failed to stop component.
+     */
+    @AfterEach
+    public void tearDown() throws Exception {
+        if (raftSrv != null)
+            raftSrv.beforeNodeStop();
+
+        if (network != null)
+            network.beforeNodeStop();
+
+        if (raftSrv != null)
+            raftSrv.stop();
+
+        if (network != null)
+            network.stop();
+    }
+
+    /**
+     * Checks whether publisher provides all existing data and then completes 
if requested by one row at a time.
+     */
+    @Test
+    public void testOneRowScan() throws Exception {
+        requestNTest(
+            List.of(
+                prepareDataRow("key1", "val1"),
+                prepareDataRow("key2", "val2")
+            ),
+            1);
+    }
+
+    /**
+     * Checks whether publisher provides all existing data and then completes 
if requested by multiple rows at a time.
+     */
+    @Test
+    public void testMultipleRowScan() throws Exception {
+        requestNTest(
+            List.of(
+                prepareDataRow("key1", "val1"),
+                prepareDataRow("key2", "val2"),
+                prepareDataRow("key3", "val3"),
+                prepareDataRow("key4", "val4"),
+                prepareDataRow("key5", "val5")
+            ),
+            2);
+    }
+
+    /**
+     * Checks whether {@link IllegalArgumentException} is thrown and inner 
storage cursor is closes in case of invalid
+     * requested amount of items.
+     *
+     * @throws Exception If any.
+     */
+    @Test()
+    public void testInvalidRequestedAmountScan() throws Exception {
+        AtomicBoolean cursorClosed = new AtomicBoolean(false);
+
+        when(mockStorage.scan(any())).thenAnswer(invocation -> {
+            var cursor = mock(Cursor.class);
+
+            doAnswer(
+                invocationClose -> {
+                    cursorClosed.set(true);
+                    return null;
+                }
+            ).when(cursor).close();
+
+            when(cursor.hasNext()).thenAnswer(hnInvocation -> {
+                throw new StorageException("test");
+            });
+
+            return cursor;
+        });
+
+        for (long n : new long[] {-1, 0}) {
+            AtomicReference<Throwable> gotException = new AtomicReference<>();
+
+            cursorClosed.set(false);
+
+            internalTbl.scan(0, null).subscribe(new Subscriber<>() {
+                @Override public void onSubscribe(Subscription subscription) {
+                    subscription.request(n);
+                }
+
+                @Override public void onNext(BinaryRow item) {
+                    fail("Should never get here.");
+                }
+
+                @Override public void onError(Throwable throwable) {
+                    gotException.set(throwable);
+                }
+
+                @Override public void onComplete() {
+                    fail("Should never get here.");
+                }
+            });
+
+            assertTrue(waitForCondition(() -> gotException.get() != null, 
1_000));
+
+            assertTrue(waitForCondition(cursorClosed::get, 1_000));
+
+            assertThrows(
+                IllegalArgumentException.class,
+                () -> {
+                    throw gotException.get();
+                }
+            );
+        }
+    }
+
+    /**
+     * Checks that exception from storage cursors has next properly propagates 
to subscriber.
+     */
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-15581";)
+    @Test
+    public void testExceptionRowScanCursorHasNext() throws Exception {
+        AtomicReference<Throwable> gotException = new AtomicReference<>();
+
+        AtomicBoolean cursorClosed = new AtomicBoolean(false);
+
+        when(mockStorage.scan(any())).thenAnswer(invocation -> {
+            var cursor = mock(Cursor.class);
+
+            when(cursor.hasNext()).thenAnswer(hnInvocation -> {
+                throw new StorageException("test");
+            });
+
+            doAnswer(
+                invocationClose -> {
+                    cursorClosed.set(true);
+                    return null;
+                }
+            ).when(cursor).close();
+
+            return cursor;
+        });
+
+        internalTbl.scan(0, null).subscribe(new Subscriber<>() {
+
+            @Override public void onSubscribe(Subscription subscription) {
+                subscription.request(1);
+            }
+
+            @Override public void onNext(BinaryRow item) {
+                fail("Should never get here.");
+            }
+
+            @Override public void onError(Throwable throwable) {
+                gotException.set(throwable);
+            }
+
+            @Override public void onComplete() {
+                fail("Should never get here.");
+            }
+        });
+
+        assertTrue(waitForCondition(() -> gotException.get() != null, 1_000));
+
+        assertEquals(gotException.get().getCause().getClass(), 
StorageException.class);
+
+        assertTrue(waitForCondition(cursorClosed::get, 1_000));
+    }
+
+    /**
+     * Checks that exception from storage cursor creation properly propagates 
to subscriber.
+     */
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-15581";)
+    @Test
+    public void testExceptionRowScan() throws Exception {
+        AtomicReference<Throwable> gotException = new AtomicReference<>();
+
+        when(mockStorage.scan(any())).thenThrow(new StorageException("Some 
storage exception"));
+
+        internalTbl.scan(0, null).subscribe(new Subscriber<>() {
+
+            @Override public void onSubscribe(Subscription subscription) {
+                subscription.request(1);
+            }
+
+            @Override public void onNext(BinaryRow item) {
+                fail("Should never get here.");
+            }
+
+            @Override public void onError(Throwable throwable) {
+                gotException.set(throwable);
+            }
+
+            @Override public void onComplete() {
+                fail("Should never get here.");
+            }
+        });
+
+        assertTrue(waitForCondition(() -> gotException.get() != null, 1_000));
+
+        assertEquals(gotException.get().getCause().getClass(), 
StorageException.class);
+    }
+
+
+    /**
+     * Checks that {@link IllegalArgumentException} is thrown in case of 
invalid parition.

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to