sanpwc commented on a change in pull request #348:
URL: https://github.com/apache/ignite-3/pull/348#discussion_r714753614
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -310,4 +344,117 @@ private int partId(BinaryRow row) {
return list;
});
}
+
+ /** Parition scan publisher. */
+ private class PartitionScanPublisher implements Publisher<BinaryRow> {
+ /** List of subscriptions. */
+ private final List<PartitionScanSubscription> subscriptions;
+
+ /** {@link Publisher<BinaryRow>} that relatively notifies about
partition rows. */
+ private final RaftGroupService raftGrpSvc;
+
+ /**
+ * The constructor.
+ *
+ * @param raftGrpSvc {@link Publisher<BinaryRow>} that relatively
notifies about partition rows.
+ */
+ PartitionScanPublisher(RaftGroupService raftGrpSvc) {
+ this.subscriptions = Collections.synchronizedList(new
ArrayList<>());
+ this.raftGrpSvc = raftGrpSvc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void subscribe(Subscriber<? super BinaryRow>
subscriber) {
+ PartitionScanSubscription subscription = new
PartitionScanSubscription(subscriber);
+
+ subscriptions.add(subscription);
+
+ subscriber.onSubscribe(subscription);
+ }
+
+ /**
+ * Partition Scan Subscription.
+ */
+ private class PartitionScanSubscription implements Subscription {
+ /** */
+ private final Subscriber<? super BinaryRow> subscriber;
+
+ /** */
+ private final AtomicBoolean isCanceled;
+
+ /** Scan id to uniquely identify it on server side. */
+ private final IgniteUuid scanId;
+
+ /** Scan initial operation that created server cursor. */
+ private final CompletableFuture<Void> scanInitOp;
+
+ /**
+ * The constructor.
+ * @param subscriber The subscriber.
+ */
+ private PartitionScanSubscription(Subscriber<? super BinaryRow>
subscriber) {
+ this.subscriber = subscriber;
+ this.isCanceled = new AtomicBoolean(false);
+ this.scanId = UUID_GENERATOR.randomUuid();
+ // TODO: IGNITE-15544 Close partition scans on node left.
+ this.scanInitOp = raftGrpSvc.run(new ScanInitCommand("",
scanId));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void request(long n) {
+ if (n < 0) {
+ cancel();
+
+ subscriber.onError(new IllegalArgumentException("Requested
amount of items is less than 0."));
+ }
+
+ if (isCanceled.get())
+ return;
+
+ scanInitOp.thenCompose((none) ->
raftGrpSvc.<MultiRowsResponse>run(new ScanRetrieveBatchCommand(n, scanId)))
+ .thenAccept(
+ res -> {
+ if (res.getValues() == null) {
+ raftGrpSvc.run(new
ScanCloseCommand(scanId)).exceptionally(closeT -> {
+ LOG.warn("Unable to close scan.", closeT);
+
+ return null;
+ });
+
+ subscriber.onComplete();
Review comment:
Fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]