korlov42 commented on a change in pull request #348:
URL: https://github.com/apache/ignite-3/pull/348#discussion_r713795918
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
##########
@@ -195,5 +196,13 @@
CompletableFuture<Collection<BinaryRow>>
deleteAllExact(Collection<BinaryRow> rows,
@Nullable Transaction tx);
- //TODO: IGNTIE-14488. Add invoke() methods.
+ /**
+ * Scans given partition, providing {@link Publisher<BinaryRow>} that
reflectively notifies about partition rows.
+ * @param p The partition.
+ * @param tx The transaction.
+ * @return {@link Publisher<BinaryRow>} t{@link Publisher<BinaryRow>} that
reflectively notifies about partition rows.
+ */
+ @NotNull Publisher<BinaryRow> scan(int p, @Nullable Transaction tx);
Review comment:
```suggestion
/**
* Scans given partition, providing {@link Publisher<BinaryRow>} that
reflectively notifies about partition rows.
*
* @param p The partition.
* @param tx The transaction.
* @return {@link Publisher<BinaryRow>} that reflectively notifies about
partition rows.
*/
@NotNull Publisher<BinaryRow> scan(int p, @Nullable Transaction tx);
```
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
##########
@@ -417,6 +436,80 @@ private void
handleGetAndUpsertCommand(CommandClosure<GetAndUpsertCommand> clo)
clo.result(new SingleRowResponse(response));
}
+ /**
+ * Handler for the {@link ScanInitCommand}.
+ *
+ * @param clo Command closure.
+ */
+ private void handleScanInitCommand(CommandClosure<ScanInitCommand> clo) {
+ ScanInitCommand rangeCmd = clo.command();
+
+ IgniteUuid cursorId = rangeCmd.scanId();
+
+ Cursor<DataRow> cursor = storage.scan(key -> true);
+
+ cursors.put(
+ cursorId,
+ new CursorMeta(
+ cursor,
+ rangeCmd.requesterNodeId()
+ )
+ );
+
+ clo.result(null);
+ }
+
+ /**
+ * Handler for the {@link ScanRetrieveBatchCommand}.
+ *
+ * @param clo Command closure.
+ */
+ private void
handleScanRetrieveBatchCommand(CommandClosure<ScanRetrieveBatchCommand> clo) {
+ CursorMeta cursorDesc = cursors.get(clo.command().scanId());
+
+ if (cursorDesc == null) {
+ clo.result(new NoSuchElementException("Corresponding cursor on
server side not found."));
+
+ return;
+ }
+
+ List<BinaryRow> res = new ArrayList<>();
+
+ try {
+ for (int i = 0; i < clo.command().itemsToRetrieveCount() &&
cursorDesc.cursor().hasNext(); i++)
+ res.add(new
ByteBufferRow(cursorDesc.cursor().next().valueBytes()));
+ }
+ catch (Exception e) {
+ clo.result(e);
+ }
+
+ clo.result(new MultiRowsResponse(res));
+ }
+
+ /**
+ * Handler for the {@link ScanCloseCommand}.
+ *
+ * @param clo Command closure.
+ */
+ private void handleScanCloseCommand(CommandClosure<ScanCloseCommand> clo) {
+ CursorMeta cursorDesc = cursors.get(clo.command().scanId());
Review comment:
Probably, we need to remove cursor at this point, not just get it from
the map
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
##########
@@ -417,6 +436,80 @@ private void
handleGetAndUpsertCommand(CommandClosure<GetAndUpsertCommand> clo)
clo.result(new SingleRowResponse(response));
}
+ /**
+ * Handler for the {@link ScanInitCommand}.
+ *
+ * @param clo Command closure.
+ */
+ private void handleScanInitCommand(CommandClosure<ScanInitCommand> clo) {
+ ScanInitCommand rangeCmd = clo.command();
+
+ IgniteUuid cursorId = rangeCmd.scanId();
+
+ Cursor<DataRow> cursor = storage.scan(key -> true);
+
+ cursors.put(
+ cursorId,
+ new CursorMeta(
+ cursor,
+ rangeCmd.requesterNodeId()
+ )
+ );
+
+ clo.result(null);
+ }
+
+ /**
+ * Handler for the {@link ScanRetrieveBatchCommand}.
+ *
+ * @param clo Command closure.
+ */
+ private void
handleScanRetrieveBatchCommand(CommandClosure<ScanRetrieveBatchCommand> clo) {
+ CursorMeta cursorDesc = cursors.get(clo.command().scanId());
+
+ if (cursorDesc == null) {
+ clo.result(new NoSuchElementException("Corresponding cursor on
server side not found."));
+
+ return;
+ }
+
+ List<BinaryRow> res = new ArrayList<>();
+
+ try {
+ for (int i = 0; i < clo.command().itemsToRetrieveCount() &&
cursorDesc.cursor().hasNext(); i++)
Review comment:
Type of `ScanRetrieveBatchCommand#itemsToRetrieveCount` is long, thus if
someone will request `Interger.MAX_VALUE + 1` rows, it will read the cursor
till the very end because the integer counter will overflow.
Another problem here, I doubt ArrayList could handle more rows than
`Integer.MAX_VALUE` (assuming the heap size is not questioned here).
With that said, I propose to change the type of
`ScanRetrieveBatchCommand#itemsToRetrieveCount` on int.
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
##########
@@ -494,4 +587,42 @@ private void
handleGetAndUpsertCommand(CommandClosure<GetAndUpsertCommand> clo)
public Storage getStorage() {
return storage;
}
+
+ /**
+ * Cursor meta information: origin node id and type.
+ */
+ private class CursorMeta {
+ /** Cursor. */
+ private final Cursor<DataRow> cursor;
+
+ /** Id of the node that creates cursor. */
+ private final String requesterNodeId;
+
+ /**
+ * The constructor.
+ *
+ * @param cursor Cursor.
+ * @param requesterNodeId Id of the node that creates cursor.
+ */
+ CursorMeta(Cursor<DataRow> cursor,
Review comment:
```suggestion
CursorMeta(
Cursor<DataRow> cursor,
```
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -310,4 +344,117 @@ private int partId(BinaryRow row) {
return list;
});
}
+
+ /** Parition scan publisher. */
+ private class PartitionScanPublisher implements Publisher<BinaryRow> {
+ /** List of subscriptions. */
+ private final List<PartitionScanSubscription> subscriptions;
+
+ /** {@link Publisher<BinaryRow>} that relatively notifies about
partition rows. */
+ private final RaftGroupService raftGrpSvc;
+
+ /**
+ * The constructor.
+ *
+ * @param raftGrpSvc {@link Publisher<BinaryRow>} that relatively
notifies about partition rows.
+ */
+ PartitionScanPublisher(RaftGroupService raftGrpSvc) {
+ this.subscriptions = Collections.synchronizedList(new
ArrayList<>());
+ this.raftGrpSvc = raftGrpSvc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void subscribe(Subscriber<? super BinaryRow>
subscriber) {
+ PartitionScanSubscription subscription = new
PartitionScanSubscription(subscriber);
+
+ subscriptions.add(subscription);
Review comment:
for the first iteration I would prefer to introduce `one subscriber per
publisher` restriction. Otherwise what should we do in the case of unbalanced
consumption? Assume one subscriber requests only one row and another requests
1000 rows. Should we maintain a pending queue for the first subscriber?
Currently we push the row to all subscribers regardless the actual amount of
requested rows and I doubt it's desired behaviour.
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
##########
@@ -417,6 +436,80 @@ private void
handleGetAndUpsertCommand(CommandClosure<GetAndUpsertCommand> clo)
clo.result(new SingleRowResponse(response));
}
+ /**
+ * Handler for the {@link ScanInitCommand}.
+ *
+ * @param clo Command closure.
+ */
+ private void handleScanInitCommand(CommandClosure<ScanInitCommand> clo) {
+ ScanInitCommand rangeCmd = clo.command();
+
+ IgniteUuid cursorId = rangeCmd.scanId();
+
+ Cursor<DataRow> cursor = storage.scan(key -> true);
Review comment:
The `scan` method actually throws exceptions, thus let's wrap this
invocation with `try-catch` add appropriate test to verify everything is OK
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
##########
@@ -417,6 +436,80 @@ private void
handleGetAndUpsertCommand(CommandClosure<GetAndUpsertCommand> clo)
clo.result(new SingleRowResponse(response));
}
+ /**
+ * Handler for the {@link ScanInitCommand}.
+ *
+ * @param clo Command closure.
+ */
+ private void handleScanInitCommand(CommandClosure<ScanInitCommand> clo) {
+ ScanInitCommand rangeCmd = clo.command();
+
+ IgniteUuid cursorId = rangeCmd.scanId();
+
+ Cursor<DataRow> cursor = storage.scan(key -> true);
+
+ cursors.put(
+ cursorId,
+ new CursorMeta(
+ cursor,
+ rangeCmd.requesterNodeId()
+ )
+ );
+
+ clo.result(null);
+ }
+
+ /**
+ * Handler for the {@link ScanRetrieveBatchCommand}.
+ *
+ * @param clo Command closure.
+ */
+ private void
handleScanRetrieveBatchCommand(CommandClosure<ScanRetrieveBatchCommand> clo) {
+ CursorMeta cursorDesc = cursors.get(clo.command().scanId());
+
+ if (cursorDesc == null) {
+ clo.result(new NoSuchElementException("Corresponding cursor on
server side not found."));
+
+ return;
+ }
+
+ List<BinaryRow> res = new ArrayList<>();
+
+ try {
+ for (int i = 0; i < clo.command().itemsToRetrieveCount() &&
cursorDesc.cursor().hasNext(); i++)
+ res.add(new
ByteBufferRow(cursorDesc.cursor().next().valueBytes()));
+ }
+ catch (Exception e) {
+ clo.result(e);
+ }
+
+ clo.result(new MultiRowsResponse(res));
+ }
+
+ /**
+ * Handler for the {@link ScanCloseCommand}.
+ *
+ * @param clo Command closure.
+ */
+ private void handleScanCloseCommand(CommandClosure<ScanCloseCommand> clo) {
+ CursorMeta cursorDesc = cursors.get(clo.command().scanId());
+
+ if (cursorDesc == null) {
+ clo.result(null);
+
+ return;
+ }
+
+ try {
+ cursorDesc.cursor().close();
+ }
+ catch (Exception e) {
+ throw new IgniteInternalException(e);
Review comment:
should we complete the command closure with this exception?
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -310,4 +344,117 @@ private int partId(BinaryRow row) {
return list;
});
}
+
+ /** Parition scan publisher. */
Review comment:
misspelled `partition`
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -310,4 +344,117 @@ private int partId(BinaryRow row) {
return list;
});
}
+
+ /** Parition scan publisher. */
+ private class PartitionScanPublisher implements Publisher<BinaryRow> {
+ /** List of subscriptions. */
+ private final List<PartitionScanSubscription> subscriptions;
+
+ /** {@link Publisher<BinaryRow>} that relatively notifies about
partition rows. */
+ private final RaftGroupService raftGrpSvc;
+
+ /**
+ * The constructor.
+ *
+ * @param raftGrpSvc {@link Publisher<BinaryRow>} that relatively
notifies about partition rows.
Review comment:
wrong param description
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -310,4 +344,117 @@ private int partId(BinaryRow row) {
return list;
});
}
+
+ /** Parition scan publisher. */
+ private class PartitionScanPublisher implements Publisher<BinaryRow> {
+ /** List of subscriptions. */
+ private final List<PartitionScanSubscription> subscriptions;
+
+ /** {@link Publisher<BinaryRow>} that relatively notifies about
partition rows. */
+ private final RaftGroupService raftGrpSvc;
+
+ /**
+ * The constructor.
+ *
+ * @param raftGrpSvc {@link Publisher<BinaryRow>} that relatively
notifies about partition rows.
+ */
+ PartitionScanPublisher(RaftGroupService raftGrpSvc) {
+ this.subscriptions = Collections.synchronizedList(new
ArrayList<>());
+ this.raftGrpSvc = raftGrpSvc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void subscribe(Subscriber<? super BinaryRow>
subscriber) {
+ PartitionScanSubscription subscription = new
PartitionScanSubscription(subscriber);
+
+ subscriptions.add(subscription);
+
+ subscriber.onSubscribe(subscription);
+ }
+
+ /**
+ * Partition Scan Subscription.
+ */
+ private class PartitionScanSubscription implements Subscription {
+ /** */
+ private final Subscriber<? super BinaryRow> subscriber;
+
+ /** */
+ private final AtomicBoolean isCanceled;
+
+ /** Scan id to uniquely identify it on server side. */
+ private final IgniteUuid scanId;
+
+ /** Scan initial operation that created server cursor. */
+ private final CompletableFuture<Void> scanInitOp;
+
+ /**
+ * The constructor.
+ * @param subscriber The subscriber.
+ */
+ private PartitionScanSubscription(Subscriber<? super BinaryRow>
subscriber) {
+ this.subscriber = subscriber;
+ this.isCanceled = new AtomicBoolean(false);
+ this.scanId = UUID_GENERATOR.randomUuid();
+ // TODO: IGNITE-15544 Close partition scans on node left.
+ this.scanInitOp = raftGrpSvc.run(new ScanInitCommand("",
scanId));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void request(long n) {
+ if (n < 0) {
+ cancel();
+
+ subscriber.onError(new IllegalArgumentException("Requested
amount of items is less than 0."));
+ }
+
+ if (isCanceled.get())
+ return;
+
+ scanInitOp.thenCompose((none) ->
raftGrpSvc.<MultiRowsResponse>run(new ScanRetrieveBatchCommand(n, scanId)))
+ .thenAccept(
+ res -> {
+ if (res.getValues() == null) {
+ raftGrpSvc.run(new
ScanCloseCommand(scanId)).exceptionally(closeT -> {
+ LOG.warn("Unable to close scan.", closeT);
+
+ return null;
+ });
Review comment:
```suggestion
cancel();
```
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
##########
@@ -195,5 +196,13 @@
CompletableFuture<Collection<BinaryRow>>
deleteAllExact(Collection<BinaryRow> rows,
@Nullable Transaction tx);
- //TODO: IGNTIE-14488. Add invoke() methods.
+ /**
+ * Scans given partition, providing {@link Publisher<BinaryRow>} that
reflectively notifies about partition rows.
+ * @param p The partition.
+ * @param tx The transaction.
+ * @return {@link Publisher<BinaryRow>} t{@link Publisher<BinaryRow>} that
reflectively notifies about partition rows.
+ */
+ @NotNull Publisher<BinaryRow> scan(int p, @Nullable Transaction tx);
Review comment:
BTW `reflectively` or `reactively`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]