agura commented on a change in pull request #348:
URL: https://github.com/apache/ignite-3/pull/348#discussion_r717905768
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -44,16 +51,29 @@
import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
import
org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
import
org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
+import
org.apache.ignite.internal.table.distributed.command.scan.ScanCloseCommand;
+import
org.apache.ignite.internal.table.distributed.command.scan.ScanInitCommand;
+import
org.apache.ignite.internal.table.distributed.command.scan.ScanRetrieveBatchCommand;
+import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.lang.IgniteUuidGenerator;
+import org.apache.ignite.lang.LoggerMessageHelper;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.schema.definition.SchemaManagementMode;
import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
* Storage of table rows.
*/
public class InternalTableImpl implements InternalTable {
+ /** Log. */
+ private static final IgniteLogger LOG =
IgniteLogger.forClass(InternalTableImpl.class);
+
+ /** IgniteUuid generator. */
+ private final IgniteUuidGenerator UUID_GENERATOR = new
IgniteUuidGenerator(UUID.randomUUID(), 0);
Review comment:
Name of non-static field should be in lower case.
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -302,4 +338,151 @@ private int partId(BinaryRow row) {
return list;
});
}
+
+ /** Partition scan publisher. */
+ private class PartitionScanPublisher implements Publisher<BinaryRow> {
+ /** {@link Publisher<BinaryRow>} that relatively notifies about
partition rows. */
+ private final RaftGroupService raftGrpSvc;
+
+ /** */
+ private AtomicBoolean wasSubscribed;
Review comment:
It seems `was` word is redundant here.
##########
File path:
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITInternalTableScanTest.java
##########
@@ -0,0 +1,528 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.distributed;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.RaftServerImpl;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteUuidGenerator;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
+import org.apache.ignite.utils.ClusterServiceTestUtils;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link InternalTable#scan(int, org.apache.ignite.tx.Transaction)}
+ */
+@ExtendWith(MockitoExtension.class)
+public class ITInternalTableScanTest {
+ /** */
+ private static final ClusterServiceFactory NETWORK_FACTORY = new
TestScaleCubeClusterServiceFactory();
+
+ /** */
+ private static final MessageSerializationRegistry SERIALIZATION_REGISTRY =
new MessageSerializationRegistryImpl();
+
+ /** */
+ private static final RaftMessagesFactory FACTORY = new
RaftMessagesFactory();
+
+ /** */
+ private static final String TEST_TABLE_NAME = "testTbl";
+
+ /** Mock partition storage. */
+ @Mock
+ private Storage mockStorage;
+
+ /** */
+ private ClusterService network;
+
+ /** */
+ private RaftServer raftSrv;
+
+ /** Internal table to test. */
+ private InternalTable internalTbl;
+
+ /**
+ * Prepare test environment:
+ * <ol>
+ * <li>Start network node.</li>
+ * <li>Start raft server.</li>
+ * <li>Prepare partitioned raft group.</li>
+ * <li>Prepare partitioned raft group service.</li>
+ * <li>Prepare internal table as a test object.</li>
+ * </ol>
+ *
+ * @throws Exception If any.
+ */
+ @BeforeEach
+ public void setUp() throws Exception {
+ NetworkAddress nodeNetworkAddress = new NetworkAddress("localhost",
20_000);
+
+ network = ClusterServiceTestUtils.clusterService(
+ "Node" + 20_000,
+ 20_000,
+ new StaticNodeFinder(List.of(nodeNetworkAddress)),
+ SERIALIZATION_REGISTRY,
+ NETWORK_FACTORY
+ );
+
+ network.start();
+
+ raftSrv = new RaftServerImpl(network, FACTORY);
+
+ raftSrv.start();
+
+ String grpName = "test_part_grp";
+
+ List<Peer> conf = List.of(new Peer(nodeNetworkAddress));
+
+ mockStorage = mock(Storage.class);
+
+ raftSrv.startRaftGroup(
+ grpName,
+ new PartitionListener(mockStorage),
+ conf
+ );
+
+ RaftGroupService raftGrpSvc = RaftGroupServiceImpl.start(
+ grpName,
+ network,
+ FACTORY,
+ 10_000,
+ conf,
+ true,
+ 200
+ ).get(3, TimeUnit.SECONDS);
+
+ internalTbl = new InternalTableImpl(
+ TEST_TABLE_NAME,
+ new IgniteUuidGenerator(UUID.randomUUID(), 0).randomUuid(),
+ Map.of(0, raftGrpSvc),
+ 1
+ );
+ }
+
+ /**
+ * Cleanup previously started network and raft server.
+ *
+ * @throws Exception If failed to stop component.
+ */
+ @AfterEach
+ public void tearDown() throws Exception {
+ if (raftSrv != null)
+ raftSrv.beforeNodeStop();
+
+ if (network != null)
+ network.beforeNodeStop();
+
+ if (raftSrv != null)
+ raftSrv.stop();
+
+ if (network != null)
+ network.stop();
+ }
+
+ /**
+ * Checks whether publisher provides all existing data and then completes
if requested by one row at a time.
+ */
+ @Test
+ public void testOneRowScan() throws Exception {
+ requestNTest(
+ List.of(
+ prepareDataRow("key1", "val1"),
+ prepareDataRow("key2", "val2")
+ ),
+ 1);
+ }
+
+ /**
+ * Checks whether publisher provides all existing data and then completes
if requested by multiple rows at a time.
+ */
+ @Test
+ public void testMultipleRowScan() throws Exception {
+ requestNTest(
+ List.of(
+ prepareDataRow("key1", "val1"),
+ prepareDataRow("key2", "val2"),
+ prepareDataRow("key3", "val3"),
+ prepareDataRow("key4", "val4"),
+ prepareDataRow("key5", "val5")
+ ),
+ 2);
+ }
+
+ /**
+ * Checks whether {@link IllegalArgumentException} is thrown and inner
storage cursor is closes in case of invalid
+ * requested amount of items.
+ *
+ * @throws Exception If any.
+ */
+ @Test()
+ public void testInvalidRequestedAmountScan() throws Exception {
+ AtomicBoolean cursorClosed = new AtomicBoolean(false);
+
+ when(mockStorage.scan(any())).thenAnswer(invocation -> {
+ var cursor = mock(Cursor.class);
+
+ doAnswer(
+ invocationClose -> {
+ cursorClosed.set(true);
+ return null;
+ }
+ ).when(cursor).close();
+
+ when(cursor.hasNext()).thenAnswer(hnInvocation -> {
+ throw new StorageException("test");
+ });
+
+ return cursor;
+ });
+
+ for (long n : new long[] {-1, 0}) {
+ AtomicReference<Throwable> gotException = new AtomicReference<>();
+
+ cursorClosed.set(false);
+
+ internalTbl.scan(0, null).subscribe(new Subscriber<>() {
+ @Override public void onSubscribe(Subscription subscription) {
+ subscription.request(n);
+ }
+
+ @Override public void onNext(BinaryRow item) {
+ fail("Should never get here.");
+ }
+
+ @Override public void onError(Throwable throwable) {
+ gotException.set(throwable);
+ }
+
+ @Override public void onComplete() {
+ fail("Should never get here.");
+ }
+ });
+
+ assertTrue(waitForCondition(() -> gotException.get() != null,
1_000));
+
+ assertTrue(waitForCondition(cursorClosed::get, 1_000));
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> {
+ throw gotException.get();
+ }
+ );
+ }
+ }
+
+ /**
+ * Checks that exception from storage cursors has next properly propagates
to subscriber.
+ */
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-15581")
+ @Test
+ public void testExceptionRowScanCursorHasNext() throws Exception {
+ AtomicReference<Throwable> gotException = new AtomicReference<>();
+
+ AtomicBoolean cursorClosed = new AtomicBoolean(false);
+
+ when(mockStorage.scan(any())).thenAnswer(invocation -> {
+ var cursor = mock(Cursor.class);
+
+ when(cursor.hasNext()).thenAnswer(hnInvocation -> {
+ throw new StorageException("test");
+ });
+
+ doAnswer(
+ invocationClose -> {
+ cursorClosed.set(true);
+ return null;
+ }
+ ).when(cursor).close();
+
+ return cursor;
+ });
+
+ internalTbl.scan(0, null).subscribe(new Subscriber<>() {
+
+ @Override public void onSubscribe(Subscription subscription) {
+ subscription.request(1);
+ }
+
+ @Override public void onNext(BinaryRow item) {
+ fail("Should never get here.");
+ }
+
+ @Override public void onError(Throwable throwable) {
+ gotException.set(throwable);
+ }
+
+ @Override public void onComplete() {
+ fail("Should never get here.");
+ }
+ });
+
+ assertTrue(waitForCondition(() -> gotException.get() != null, 1_000));
+
+ assertEquals(gotException.get().getCause().getClass(),
StorageException.class);
+
+ assertTrue(waitForCondition(cursorClosed::get, 1_000));
+ }
+
+ /**
+ * Checks that exception from storage cursor creation properly propagates
to subscriber.
+ */
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-15581")
+ @Test
+ public void testExceptionRowScan() throws Exception {
+ AtomicReference<Throwable> gotException = new AtomicReference<>();
+
+ when(mockStorage.scan(any())).thenThrow(new StorageException("Some
storage exception"));
+
+ internalTbl.scan(0, null).subscribe(new Subscriber<>() {
+
+ @Override public void onSubscribe(Subscription subscription) {
+ subscription.request(1);
+ }
+
+ @Override public void onNext(BinaryRow item) {
+ fail("Should never get here.");
+ }
+
+ @Override public void onError(Throwable throwable) {
+ gotException.set(throwable);
+ }
+
+ @Override public void onComplete() {
+ fail("Should never get here.");
+ }
+ });
+
+ assertTrue(waitForCondition(() -> gotException.get() != null, 1_000));
+
+ assertEquals(gotException.get().getCause().getClass(),
StorageException.class);
+ }
+
+
+ /**
+ * Checks that {@link IllegalArgumentException} is thrown in case of
invalid parition.
Review comment:
`partition`, not `parition`
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -302,4 +338,151 @@ private int partId(BinaryRow row) {
return list;
});
}
+
+ /** Partition scan publisher. */
+ private class PartitionScanPublisher implements Publisher<BinaryRow> {
+ /** {@link Publisher<BinaryRow>} that relatively notifies about
partition rows. */
+ private final RaftGroupService raftGrpSvc;
+
+ /** */
+ private AtomicBoolean wasSubscribed;
+
+ /**
+ * The constructor.
+ *
+ * @param raftGrpSvc {@link RaftGroupService} to run corresponding
raft commands.
+ */
+ PartitionScanPublisher(RaftGroupService raftGrpSvc) {
+ this.raftGrpSvc = raftGrpSvc;
+ this.wasSubscribed = new AtomicBoolean(false);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void subscribe(Subscriber<? super BinaryRow>
subscriber) {
+ if (subscriber == null)
Review comment:
Non-public API. Use `assert` instead of `NullPointerException`
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -302,4 +338,151 @@ private int partId(BinaryRow row) {
return list;
});
}
+
+ /** Partition scan publisher. */
+ private class PartitionScanPublisher implements Publisher<BinaryRow> {
+ /** {@link Publisher<BinaryRow>} that relatively notifies about
partition rows. */
+ private final RaftGroupService raftGrpSvc;
+
+ /** */
+ private AtomicBoolean wasSubscribed;
+
+ /**
+ * The constructor.
+ *
+ * @param raftGrpSvc {@link RaftGroupService} to run corresponding
raft commands.
+ */
+ PartitionScanPublisher(RaftGroupService raftGrpSvc) {
+ this.raftGrpSvc = raftGrpSvc;
+ this.wasSubscribed = new AtomicBoolean(false);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void subscribe(Subscriber<? super BinaryRow>
subscriber) {
+ if (subscriber == null)
+ throw new NullPointerException("Subscriber is null");
+
+ if (!wasSubscribed.compareAndSet(false, true))
+ subscriber.onError(new IllegalStateException("Scan publisher
does not support multiple subscriptions."));
+
+ PartitionScanSubscription subscription = new
PartitionScanSubscription(subscriber);
+
+ subscriber.onSubscribe(subscription);
+ }
+
+ /**
+ * Partition Scan Subscription.
+ */
+ private class PartitionScanSubscription implements Subscription {
+ /** */
+ private final Subscriber<? super BinaryRow> subscriber;
+
+ /** */
+ private final AtomicBoolean isCanceled;
Review comment:
Just `canceled`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]