sanpwc commented on a change in pull request #348:
URL: https://github.com/apache/ignite-3/pull/348#discussion_r716897957



##########
File path: 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
##########
@@ -417,6 +436,80 @@ private void 
handleGetAndUpsertCommand(CommandClosure<GetAndUpsertCommand> clo)
         clo.result(new SingleRowResponse(response));
     }
 
+    /**
+     * Handler for the {@link ScanInitCommand}.
+     *
+     * @param clo Command closure.
+     */
+    private void handleScanInitCommand(CommandClosure<ScanInitCommand> clo) {
+        ScanInitCommand rangeCmd = clo.command();
+
+        IgniteUuid cursorId = rangeCmd.scanId();
+
+        Cursor<DataRow> cursor = storage.scan(key -> true);
+
+        cursors.put(
+            cursorId,
+            new CursorMeta(
+                cursor,
+                rangeCmd.requesterNodeId()
+            )
+        );
+
+        clo.result(null);
+    }
+
+    /**
+     * Handler for the {@link ScanRetrieveBatchCommand}.
+     *
+     * @param clo Command closure.
+     */
+    private void 
handleScanRetrieveBatchCommand(CommandClosure<ScanRetrieveBatchCommand> clo) {
+        CursorMeta cursorDesc = cursors.get(clo.command().scanId());
+
+        if (cursorDesc == null) {
+            clo.result(new NoSuchElementException("Corresponding cursor on 
server side not found."));
+
+            return;
+        }
+
+        List<BinaryRow> res = new ArrayList<>();
+
+        try {
+            for (int i = 0; i < clo.command().itemsToRetrieveCount() && 
cursorDesc.cursor().hasNext(); i++)
+                res.add(new 
ByteBufferRow(cursorDesc.cursor().next().valueBytes()));
+        }
+        catch (Exception e) {
+            clo.result(e);
+        }
+
+        clo.result(new MultiRowsResponse(res));
+    }
+
+    /**
+     * Handler for the {@link ScanCloseCommand}.
+     *
+     * @param clo Command closure.
+     */
+    private void handleScanCloseCommand(CommandClosure<ScanCloseCommand> clo) {
+        CursorMeta cursorDesc = cursors.get(clo.command().scanId());
+
+        if (cursorDesc == null) {
+            clo.result(null);
+
+            return;
+        }
+
+        try {
+            cursorDesc.cursor().close();
+        }
+        catch (Exception e) {
+            throw new IgniteInternalException(e);

Review comment:
       > The exception doesn't matter actually. But what does matter is fact 
that something went wrong and there is no need for user code to wait for 
response, because error response is the response. Otherwise, they will wait for 
an answer that will never come.
   In case of throwing an exception from state machine raft node should be 
stopped and corresponding RaftException should be propagated to the client, so 
client won't wait for the answer any longer thаn in case of 
clo.result(exception),If it's not happening it's definitely a bug, however it's 
not in the scope of given ticket.
   > BTW I don't sure about cursorDesc.cursor().close(), but 
cursorDesc.cursor().hasNext() throws sometimes an AssertionError, hence I 
propose to catch Throwable.
   Well, I don't agree here. Only business logic exceptions should be processed 
with clo.result, all other including AssertionError should stop raft node and 
be propagated to client as RaftException, same as mentioned above actually.

##########
File path: 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
##########
@@ -417,6 +436,80 @@ private void 
handleGetAndUpsertCommand(CommandClosure<GetAndUpsertCommand> clo)
         clo.result(new SingleRowResponse(response));
     }
 
+    /**
+     * Handler for the {@link ScanInitCommand}.
+     *
+     * @param clo Command closure.
+     */
+    private void handleScanInitCommand(CommandClosure<ScanInitCommand> clo) {
+        ScanInitCommand rangeCmd = clo.command();
+
+        IgniteUuid cursorId = rangeCmd.scanId();
+
+        Cursor<DataRow> cursor = storage.scan(key -> true);
+
+        cursors.put(
+            cursorId,
+            new CursorMeta(
+                cursor,
+                rangeCmd.requesterNodeId()
+            )
+        );
+
+        clo.result(null);
+    }
+
+    /**
+     * Handler for the {@link ScanRetrieveBatchCommand}.
+     *
+     * @param clo Command closure.
+     */
+    private void 
handleScanRetrieveBatchCommand(CommandClosure<ScanRetrieveBatchCommand> clo) {
+        CursorMeta cursorDesc = cursors.get(clo.command().scanId());
+
+        if (cursorDesc == null) {
+            clo.result(new NoSuchElementException("Corresponding cursor on 
server side not found."));
+
+            return;
+        }
+
+        List<BinaryRow> res = new ArrayList<>();
+
+        try {
+            for (int i = 0; i < clo.command().itemsToRetrieveCount() && 
cursorDesc.cursor().hasNext(); i++)
+                res.add(new 
ByteBufferRow(cursorDesc.cursor().next().valueBytes()));
+        }
+        catch (Exception e) {
+            clo.result(e);
+        }
+
+        clo.result(new MultiRowsResponse(res));
+    }
+
+    /**
+     * Handler for the {@link ScanCloseCommand}.
+     *
+     * @param clo Command closure.
+     */
+    private void handleScanCloseCommand(CommandClosure<ScanCloseCommand> clo) {
+        CursorMeta cursorDesc = cursors.get(clo.command().scanId());
+
+        if (cursorDesc == null) {
+            clo.result(null);
+
+            return;
+        }
+
+        try {
+            cursorDesc.cursor().close();
+        }
+        catch (Exception e) {
+            throw new IgniteInternalException(e);

Review comment:
       > The exception doesn't matter actually. But what does matter is fact 
that something went wrong and there is no need for user code to wait for 
response, because error response is the response. Otherwise, they will wait for 
an answer that will never come.
   
   In case of throwing an exception from state machine raft node should be 
stopped and corresponding RaftException should be propagated to the client, so 
client won't wait for the answer any longer thаn in case of 
clo.result(exception),If it's not happening it's definitely a bug, however it's 
not in the scope of given ticket.
   
   > BTW I don't sure about cursorDesc.cursor().close(), but 
cursorDesc.cursor().hasNext() throws sometimes an AssertionError, hence I 
propose to catch Throwable.
   
   Well, I don't agree here. Only business logic exceptions should be processed 
with clo.result, all other including AssertionError should stop raft node and 
be propagated to client as RaftException, same as mentioned above actually.

##########
File path: 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -302,4 +336,146 @@ private int partId(BinaryRow row) {
                     return list;
                 });
     }
+
+    /** Partition scan publisher. */
+    private class PartitionScanPublisher implements Publisher<BinaryRow> {
+        /** {@link Publisher<BinaryRow>} that relatively notifies about 
partition rows.  */
+        private final RaftGroupService raftGrpSvc;
+
+        /** */
+        private AtomicBoolean wasSubscribed;
+
+        /**
+         * The constructor.
+         *
+         * @param raftGrpSvc {@link RaftGroupService} to run corresponding 
raft commands.
+         */
+        PartitionScanPublisher(RaftGroupService raftGrpSvc) {
+            this.raftGrpSvc = raftGrpSvc;
+            this.wasSubscribed = new AtomicBoolean(false);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void subscribe(Subscriber<? super BinaryRow> 
subscriber) {
+            if (subscriber == null)
+                throw new NullPointerException("Subscriber is null");
+
+            if (!wasSubscribed.compareAndSet(false, true))
+                subscriber.onError(new IllegalStateException("Scan publisher 
does not support multiple subscriptions."));
+
+            PartitionScanSubscription subscription = new 
PartitionScanSubscription(subscriber);
+
+            subscriber.onSubscribe(subscription);
+        }
+
+        /**
+         * Partition Scan Subscription.
+         */
+        private class PartitionScanSubscription implements Subscription {
+            /** */
+            private final Subscriber<? super BinaryRow> subscriber;
+
+            /** */
+            private final AtomicBoolean isCanceled;
+
+            /** Scan id to uniquely identify it on server side. */
+            private final IgniteUuid scanId;
+
+            /** Scan initial operation that created server cursor. */
+            private final CompletableFuture<Void> scanInitOp;
+
+            /**
+             * The constructor.
+             * @param subscriber The subscriber.
+             */
+            private PartitionScanSubscription(Subscriber<? super BinaryRow> 
subscriber) {
+                this.subscriber = subscriber;
+                this.isCanceled = new AtomicBoolean(false);
+                this.scanId = UUID_GENERATOR.randomUuid();
+                // TODO: IGNITE-15544 Close partition scans on node left.
+                this.scanInitOp = raftGrpSvc.run(new ScanInitCommand("", 
scanId));
+            }
+
+            /** {@inheritDoc} */
+            @Override public void request(long n) {
+                if (n <= 0) {
+                    cancel();
+
+                    subscriber.onError(new 
IllegalArgumentException(LoggerMessageHelper.
+                        format("Invalid requested amount of items 
[requested={}, minValue=1]", n))
+                    );
+                }
+
+                if (isCanceled.get())
+                    return;
+
+                final int internalBatchSize = Integer.MAX_VALUE;
+
+                for (int intBatchCnr = 0; intBatchCnr < (n / 
internalBatchSize); intBatchCnr++)
+                    scanBatch(internalBatchSize);
+
+                scanBatch((int)(n % internalBatchSize));
+            }
+
+            /** {@inheritDoc} */
+            @Override public void cancel() {
+                cancel(true);
+            }
+
+            /**
+             * Cancels given subscription and closes cursor if necessary.
+             *
+             * @param closeCursor If {@code true} closes inner storage scan.
+             */
+            private void cancel(boolean closeCursor) {
+                isCanceled.set(true);
+
+                if (closeCursor) {
+                    raftGrpSvc.run(new 
ScanCloseCommand(scanId)).exceptionally(closeT -> {
+                        LOG.warn("Unable to close scan.", closeT);
+
+                        return null;
+                    });
+                }
+            }
+
+            /**
+             * Requests and processes n requested elements where n is an 
integer.
+             *
+             * @param n Requested amount of items.
+             */
+            private void scanBatch(int n) {
+                if (isCanceled.get())
+                    return;
+
+                scanInitOp.thenCompose((none) -> 
raftGrpSvc.<MultiRowsResponse>run(new ScanRetrieveBatchCommand(n, scanId)))

Review comment:
       > We need to check isCanceled flag right before invocation of raftGrpSvc 
.run(ScanRetrieveBatchCommand)
   Without extra synchronization that won't help.

##########
File path: 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -302,4 +336,146 @@ private int partId(BinaryRow row) {
                     return list;
                 });
     }
+
+    /** Partition scan publisher. */
+    private class PartitionScanPublisher implements Publisher<BinaryRow> {
+        /** {@link Publisher<BinaryRow>} that relatively notifies about 
partition rows.  */
+        private final RaftGroupService raftGrpSvc;
+
+        /** */
+        private AtomicBoolean wasSubscribed;
+
+        /**
+         * The constructor.
+         *
+         * @param raftGrpSvc {@link RaftGroupService} to run corresponding 
raft commands.
+         */
+        PartitionScanPublisher(RaftGroupService raftGrpSvc) {
+            this.raftGrpSvc = raftGrpSvc;
+            this.wasSubscribed = new AtomicBoolean(false);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void subscribe(Subscriber<? super BinaryRow> 
subscriber) {
+            if (subscriber == null)
+                throw new NullPointerException("Subscriber is null");
+
+            if (!wasSubscribed.compareAndSet(false, true))
+                subscriber.onError(new IllegalStateException("Scan publisher 
does not support multiple subscriptions."));
+
+            PartitionScanSubscription subscription = new 
PartitionScanSubscription(subscriber);
+
+            subscriber.onSubscribe(subscription);
+        }
+
+        /**
+         * Partition Scan Subscription.
+         */
+        private class PartitionScanSubscription implements Subscription {
+            /** */
+            private final Subscriber<? super BinaryRow> subscriber;
+
+            /** */
+            private final AtomicBoolean isCanceled;
+
+            /** Scan id to uniquely identify it on server side. */
+            private final IgniteUuid scanId;
+
+            /** Scan initial operation that created server cursor. */
+            private final CompletableFuture<Void> scanInitOp;
+
+            /**
+             * The constructor.
+             * @param subscriber The subscriber.
+             */
+            private PartitionScanSubscription(Subscriber<? super BinaryRow> 
subscriber) {
+                this.subscriber = subscriber;
+                this.isCanceled = new AtomicBoolean(false);
+                this.scanId = UUID_GENERATOR.randomUuid();
+                // TODO: IGNITE-15544 Close partition scans on node left.
+                this.scanInitOp = raftGrpSvc.run(new ScanInitCommand("", 
scanId));
+            }
+
+            /** {@inheritDoc} */
+            @Override public void request(long n) {
+                if (n <= 0) {
+                    cancel();
+
+                    subscriber.onError(new 
IllegalArgumentException(LoggerMessageHelper.
+                        format("Invalid requested amount of items 
[requested={}, minValue=1]", n))
+                    );
+                }
+
+                if (isCanceled.get())
+                    return;
+
+                final int internalBatchSize = Integer.MAX_VALUE;
+
+                for (int intBatchCnr = 0; intBatchCnr < (n / 
internalBatchSize); intBatchCnr++)
+                    scanBatch(internalBatchSize);
+
+                scanBatch((int)(n % internalBatchSize));
+            }
+
+            /** {@inheritDoc} */
+            @Override public void cancel() {
+                cancel(true);
+            }
+
+            /**
+             * Cancels given subscription and closes cursor if necessary.
+             *
+             * @param closeCursor If {@code true} closes inner storage scan.
+             */
+            private void cancel(boolean closeCursor) {
+                isCanceled.set(true);
+
+                if (closeCursor) {
+                    raftGrpSvc.run(new 
ScanCloseCommand(scanId)).exceptionally(closeT -> {
+                        LOG.warn("Unable to close scan.", closeT);
+
+                        return null;
+                    });
+                }
+            }
+
+            /**
+             * Requests and processes n requested elements where n is an 
integer.
+             *
+             * @param n Requested amount of items.
+             */
+            private void scanBatch(int n) {
+                if (isCanceled.get())
+                    return;
+
+                scanInitOp.thenCompose((none) -> 
raftGrpSvc.<MultiRowsResponse>run(new ScanRetrieveBatchCommand(n, scanId)))

Review comment:
       > We need to check isCanceled flag right before invocation of raftGrpSvc 
.run(ScanRetrieveBatchCommand)
   
   Without extra synchronization that won't help.

##########
File path: 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -302,4 +336,146 @@ private int partId(BinaryRow row) {
                     return list;
                 });
     }
+
+    /** Partition scan publisher. */
+    private class PartitionScanPublisher implements Publisher<BinaryRow> {
+        /** {@link Publisher<BinaryRow>} that relatively notifies about 
partition rows.  */
+        private final RaftGroupService raftGrpSvc;
+
+        /** */
+        private AtomicBoolean wasSubscribed;
+
+        /**
+         * The constructor.
+         *
+         * @param raftGrpSvc {@link RaftGroupService} to run corresponding 
raft commands.
+         */
+        PartitionScanPublisher(RaftGroupService raftGrpSvc) {
+            this.raftGrpSvc = raftGrpSvc;
+            this.wasSubscribed = new AtomicBoolean(false);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void subscribe(Subscriber<? super BinaryRow> 
subscriber) {
+            if (subscriber == null)
+                throw new NullPointerException("Subscriber is null");
+
+            if (!wasSubscribed.compareAndSet(false, true))
+                subscriber.onError(new IllegalStateException("Scan publisher 
does not support multiple subscriptions."));
+
+            PartitionScanSubscription subscription = new 
PartitionScanSubscription(subscriber);
+
+            subscriber.onSubscribe(subscription);
+        }
+
+        /**
+         * Partition Scan Subscription.
+         */
+        private class PartitionScanSubscription implements Subscription {
+            /** */
+            private final Subscriber<? super BinaryRow> subscriber;
+
+            /** */
+            private final AtomicBoolean isCanceled;
+
+            /** Scan id to uniquely identify it on server side. */
+            private final IgniteUuid scanId;
+
+            /** Scan initial operation that created server cursor. */
+            private final CompletableFuture<Void> scanInitOp;
+
+            /**
+             * The constructor.
+             * @param subscriber The subscriber.
+             */
+            private PartitionScanSubscription(Subscriber<? super BinaryRow> 
subscriber) {
+                this.subscriber = subscriber;
+                this.isCanceled = new AtomicBoolean(false);
+                this.scanId = UUID_GENERATOR.randomUuid();
+                // TODO: IGNITE-15544 Close partition scans on node left.
+                this.scanInitOp = raftGrpSvc.run(new ScanInitCommand("", 
scanId));
+            }
+
+            /** {@inheritDoc} */
+            @Override public void request(long n) {
+                if (n <= 0) {
+                    cancel();
+
+                    subscriber.onError(new 
IllegalArgumentException(LoggerMessageHelper.
+                        format("Invalid requested amount of items 
[requested={}, minValue=1]", n))
+                    );
+                }
+
+                if (isCanceled.get())
+                    return;
+
+                final int internalBatchSize = Integer.MAX_VALUE;
+
+                for (int intBatchCnr = 0; intBatchCnr < (n / 
internalBatchSize); intBatchCnr++)
+                    scanBatch(internalBatchSize);
+
+                scanBatch((int)(n % internalBatchSize));
+            }
+
+            /** {@inheritDoc} */
+            @Override public void cancel() {
+                cancel(true);
+            }
+
+            /**
+             * Cancels given subscription and closes cursor if necessary.
+             *
+             * @param closeCursor If {@code true} closes inner storage scan.
+             */
+            private void cancel(boolean closeCursor) {
+                isCanceled.set(true);
+
+                if (closeCursor) {
+                    raftGrpSvc.run(new 
ScanCloseCommand(scanId)).exceptionally(closeT -> {
+                        LOG.warn("Unable to close scan.", closeT);
+
+                        return null;
+                    });
+                }
+            }
+
+            /**
+             * Requests and processes n requested elements where n is an 
integer.
+             *
+             * @param n Requested amount of items.
+             */
+            private void scanBatch(int n) {
+                if (isCanceled.get())
+                    return;
+
+                scanInitOp.thenCompose((none) -> 
raftGrpSvc.<MultiRowsResponse>run(new ScanRetrieveBatchCommand(n, scanId)))

Review comment:
       > We need to check isCanceled flag right before invocation of raftGrpSvc 
.run(ScanRetrieveBatchCommand)
   
   Without extra synchronization that won't help.
   
   However, I believe it's not a big deal either. According to 
`Subscription#cancel`, cancel is a sort of eventually stuff
   ```
   Causes the Subscriber to (eventually) stop receiving messages. 
Implementation is best-effort -- additional messages may be received after 
invoking this method. A cancelled subscription need not ever receive an 
onComplete or onError signal.
   ```
   So in order to be complaint with Flow contract, onError() should be 
prevented in case `SCAN_INIT`, `SCAN_CLOSE`, `SCAN_RETRIEVE`. I mean that if 
`SCAN_CLOSE` removes the cursor and `SCAN_RETRIEVE` returns 
`NoSuchElementException` we shouldn't fire onError, following code was added to 
exceptionally
   ```
                       .exceptionally(
                           t -> {
                               if (t instanceof NoSuchElementException ||
                                   t instanceof CompletionException && 
t.getCause() instanceof NoSuchElementException)
                                   return null;
   ```

##########
File path: 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -302,4 +336,146 @@ private int partId(BinaryRow row) {
                     return list;
                 });
     }
+
+    /** Partition scan publisher. */
+    private class PartitionScanPublisher implements Publisher<BinaryRow> {
+        /** {@link Publisher<BinaryRow>} that relatively notifies about 
partition rows.  */
+        private final RaftGroupService raftGrpSvc;
+
+        /** */
+        private AtomicBoolean wasSubscribed;
+
+        /**
+         * The constructor.
+         *
+         * @param raftGrpSvc {@link RaftGroupService} to run corresponding 
raft commands.
+         */
+        PartitionScanPublisher(RaftGroupService raftGrpSvc) {
+            this.raftGrpSvc = raftGrpSvc;
+            this.wasSubscribed = new AtomicBoolean(false);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void subscribe(Subscriber<? super BinaryRow> 
subscriber) {
+            if (subscriber == null)
+                throw new NullPointerException("Subscriber is null");
+
+            if (!wasSubscribed.compareAndSet(false, true))
+                subscriber.onError(new IllegalStateException("Scan publisher 
does not support multiple subscriptions."));
+
+            PartitionScanSubscription subscription = new 
PartitionScanSubscription(subscriber);
+
+            subscriber.onSubscribe(subscription);
+        }
+
+        /**
+         * Partition Scan Subscription.
+         */
+        private class PartitionScanSubscription implements Subscription {
+            /** */
+            private final Subscriber<? super BinaryRow> subscriber;
+
+            /** */
+            private final AtomicBoolean isCanceled;
+
+            /** Scan id to uniquely identify it on server side. */
+            private final IgniteUuid scanId;
+
+            /** Scan initial operation that created server cursor. */
+            private final CompletableFuture<Void> scanInitOp;
+
+            /**
+             * The constructor.
+             * @param subscriber The subscriber.
+             */
+            private PartitionScanSubscription(Subscriber<? super BinaryRow> 
subscriber) {
+                this.subscriber = subscriber;
+                this.isCanceled = new AtomicBoolean(false);
+                this.scanId = UUID_GENERATOR.randomUuid();
+                // TODO: IGNITE-15544 Close partition scans on node left.
+                this.scanInitOp = raftGrpSvc.run(new ScanInitCommand("", 
scanId));
+            }
+
+            /** {@inheritDoc} */
+            @Override public void request(long n) {
+                if (n <= 0) {
+                    cancel();
+
+                    subscriber.onError(new 
IllegalArgumentException(LoggerMessageHelper.
+                        format("Invalid requested amount of items 
[requested={}, minValue=1]", n))
+                    );
+                }
+
+                if (isCanceled.get())
+                    return;
+
+                final int internalBatchSize = Integer.MAX_VALUE;
+
+                for (int intBatchCnr = 0; intBatchCnr < (n / 
internalBatchSize); intBatchCnr++)
+                    scanBatch(internalBatchSize);
+
+                scanBatch((int)(n % internalBatchSize));
+            }
+
+            /** {@inheritDoc} */
+            @Override public void cancel() {
+                cancel(true);
+            }
+
+            /**
+             * Cancels given subscription and closes cursor if necessary.
+             *
+             * @param closeCursor If {@code true} closes inner storage scan.
+             */
+            private void cancel(boolean closeCursor) {
+                isCanceled.set(true);
+
+                if (closeCursor) {
+                    raftGrpSvc.run(new 
ScanCloseCommand(scanId)).exceptionally(closeT -> {
+                        LOG.warn("Unable to close scan.", closeT);
+
+                        return null;
+                    });
+                }
+            }
+
+            /**
+             * Requests and processes n requested elements where n is an 
integer.
+             *
+             * @param n Requested amount of items.
+             */
+            private void scanBatch(int n) {
+                if (isCanceled.get())
+                    return;
+
+                scanInitOp.thenCompose((none) -> 
raftGrpSvc.<MultiRowsResponse>run(new ScanRetrieveBatchCommand(n, scanId)))

Review comment:
       > We need to check isCanceled flag right before invocation of raftGrpSvc 
.run(ScanRetrieveBatchCommand)
   
   Without extra synchronization that won't help.
   
   However, I believe it's not a big deal either. According to 
`Subscription#cancel`, cancel is a sort of eventually stuff
   ```
   Causes the Subscriber to (eventually) stop receiving messages. 
Implementation is best-effort -- additional messages may be received after 
invoking this method. A cancelled subscription need not ever receive an 
onComplete or onError signal.
   ```
   So in order to be complaint with Flow contract, onError() should be 
prevented in case `SCAN_INIT`, `SCAN_CLOSE`, `SCAN_RETRIEVE`. I mean that if 
`SCAN_CLOSE` removes the cursor and `SCAN_RETRIEVE` returns 
`NoSuchElementException` we shouldn't fire onError, following code was added to 
exceptionally
   ```
                       .exceptionally(
                           t -> {
                               if (t instanceof NoSuchElementException ||
                                   t instanceof CompletionException && 
t.getCause() instanceof NoSuchElementException)
                                   return null;
   ```
   
   That's however won't work without 
https://issues.apache.org/jira/browse/IGNITE-15581




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to