AMashenkov commented on a change in pull request #9081:
URL: https://github.com/apache/ignite/pull/9081#discussion_r682464590



##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
##########
@@ -69,205 +53,69 @@ protected 
GridCacheDistributedQueryFuture(GridCacheContext<K, V> ctx, long reqId
         GridCacheQueryManager<K, V> mgr = ctx.queries();
 
         assert mgr != null;
-
-        synchronized (this) {
-            for (ClusterNode node : nodes)
-                subgrid.add(node.id());
-        }
     }
 
     /** {@inheritDoc} */
-    @Override protected void cancelQuery() throws IgniteCheckedException {
-        final GridCacheQueryManager<K, V> qryMgr = cctx.queries();
-
-        assert qryMgr != null;
-
-        try {
-            Collection<ClusterNode> allNodes = cctx.discovery().allNodes();
-            Collection<ClusterNode> nodes;
-
-            synchronized (this) {
-                nodes = F.retain(allNodes, true,
-                    new P1<ClusterNode>() {
-                        @Override public boolean apply(ClusterNode node) {
-                            return !cctx.localNodeId().equals(node.id()) && 
subgrid.contains(node.id());
-                        }
-                    }
-                );
-
-                subgrid.clear();
-            }
-
-            final GridCacheQueryRequest req = new 
GridCacheQueryRequest(cctx.cacheId(),
-                reqId,
-                fields(),
-                qryMgr.queryTopologyVersion(),
-                cctx.deploymentEnabled());
-
-            // Process cancel query directly (without sending) for local node,
-            cctx.closures().callLocalSafe(new GridPlainCallable<Object>() {
-                @Override public Object call() {
-                    qryMgr.processQueryRequest(cctx.localNodeId(), req);
+    @Override protected void cancelQuery() {
+        reducer.onCancel();
 
-                    return null;
-                }
-            });
-
-            if (!nodes.isEmpty()) {
-                for (ClusterNode node : nodes) {
-                    try {
-                        cctx.io().send(node, req, cctx.ioPolicy());
-                    }
-                    catch (IgniteCheckedException e) {
-                        if (cctx.io().checkNodeLeft(node.id(), e, false)) {
-                            if (log.isDebugEnabled())
-                                log.debug("Failed to send cancel request, node 
failed: " + node);
-                        }
-                        else
-                            U.error(log, "Failed to send cancel request 
[node=" + node + ']', e);
-                    }
-                }
-            }
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send cancel request (will cancel query in 
any case).", e);
-        }
-
-        qryMgr.onQueryFutureCanceled(reqId);
+        cctx.queries().onQueryFutureCanceled(reqId);
 
         clear();
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
     @Override protected void onNodeLeft(UUID nodeId) {
-        boolean callOnPage;
-
-        synchronized (this) {
-            callOnPage = !loc && subgrid.contains(nodeId);
-        }
+        boolean qryNode = reducer.mapNode(nodeId);
 
-        if (callOnPage)
-            onPage(nodeId, Collections.emptyList(),
+        if (qryNode)
+            onPage(nodeId, null,
                 new ClusterTopologyCheckedException("Remote node has left 
topology: " + nodeId), true);
     }
 
     /** {@inheritDoc} */
-    @Override public void awaitFirstPage() throws IgniteCheckedException {
-        try {
-            firstPageLatch.await();
+    @Override public void awaitFirstItemAvailable() throws 
IgniteCheckedException {
+        reducer.awaitInitialization();
 
-            if (isDone() && error() != null)
-                // Throw the exception if future failed.
-                get();
-        }
-        catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-
-            throw new IgniteInterruptedCheckedException(e);
-        }
+        if (isDone() && error() != null)
+            // Throw the exception if future failed.
+            get();
     }
 
     /** {@inheritDoc} */
-    @Override protected boolean onPage(UUID nodeId, boolean last) {
-        assert Thread.holdsLock(this);
-
-        if (!loc) {
-            rcvd.add(nodeId);
-
-            if (rcvd.containsAll(subgrid))
-                firstPageLatch.countDown();
-        }
-
-        boolean futFinish;
-
-        if (last) {
-            futFinish = loc || (subgrid.remove(nodeId) && subgrid.isEmpty());
-
-            if (futFinish)
-                firstPageLatch.countDown();
-        }
-        else
-            futFinish = false;
-
-        return futFinish;
+    @Override protected CacheQueryReducer<R> reducer() {
+        return reducer;
     }
 
-    /** {@inheritDoc} */
-    @Override protected void loadPage() {
-        assert !Thread.holdsLock(this);
-
-        Collection<ClusterNode> nodes = null;
-
-        synchronized (this) {
-            if (!isDone() && rcvd.containsAll(subgrid)) {
-                rcvd.clear();
-
-                nodes = nodes();
-            }
-        }
-
-        if (nodes != null)
-            cctx.queries().loadPage(reqId, qry.query(), nodes, false);
+    /** Set reducer. */
+    void reducer(DistributedCacheQueryReducer<R> reducer) {
+        this.reducer = reducer;
     }
 
     /** {@inheritDoc} */
-    @Override protected void loadAllPages() throws 
IgniteInterruptedCheckedException {
-        assert !Thread.holdsLock(this);
-
-        U.await(firstPageLatch);
-
-        Collection<ClusterNode> nodes = null;
-
-        synchronized (this) {
-            if (!isDone() && !subgrid.isEmpty())
-                nodes = nodes();
-        }
-
-        if (nodes != null)
-            cctx.queries().loadPage(reqId, qry.query(), nodes, true);
-    }
-
-    /**
-     * @return Nodes to send requests to.
-     */
-    private Collection<ClusterNode> nodes() {
-        assert Thread.holdsLock(this);
-
-        Collection<ClusterNode> nodes = new ArrayList<>(subgrid.size());
-
-        for (UUID nodeId : subgrid) {
-            ClusterNode node = cctx.discovery().node(nodeId);
-
-            if (node != null)
-                nodes.add(node);
-        }
-
-        return nodes;
+    @Override public Collection<R> get() throws IgniteCheckedException {
+        return get0();
     }
 
     /** {@inheritDoc} */
-    @Override public boolean onDone(Collection<R> res, Throwable err) {
-        boolean done = super.onDone(res, err);
-
-        // Must release the lath after onDone() in order for a waiting thread 
to see an exception, if any.
-        firstPageLatch.countDown();
-
-        return done;
+    @Override public Collection<R> get(long timeout, TimeUnit unit) throws 
IgniteCheckedException {
+        return get0();
     }
 
     /** {@inheritDoc} */
-    @Override public boolean onCancelled() {
-        firstPageLatch.countDown();
-
-        return super.onCancelled();
+    @Override public Collection<R> getUninterruptibly() throws 
IgniteCheckedException {
+        return get0();
     }
 
-    /** {@inheritDoc} */
-    @Override public void onTimeout() {
-        firstPageLatch.countDown();
+    /**
+     * Completion of distributed query future depends on user that iterates 
over query result with lazy page loading.
+     * Then {@link #get()} can lock on unpredictably long period of time. So 
we should avoid call it.
+     */
+    private Collection<R> get0() throws IgniteCheckedException {
+        if (!isDone())
+            throw new IgniteIllegalStateException("Unexpected lock on iterator 
over distributed cache query result.");

Review comment:
       Why do you allow to get result via get() method, but forbid to wait?
   If noone must call get() then it should throws exception in anyway.
   If the method is never called and you don't care then overriding is useless.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/MergeSortDistributedCacheQueryReducer.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.reducer;
+
+import java.lang.reflect.Array;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import 
org.apache.ignite.internal.processors.cache.query.CacheQueryPageRequester;
+import 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Reducer of distirbuted query that sort result through all nodes. Note that 
it's assumed that every node
+ * returns pre-sorted collection of data.
+ */
+public class MergeSortDistributedCacheQueryReducer<R> extends 
AbstractDistributedCacheQueryReducer<R> {
+    /** Map of Node ID to stream. Used for inserting new pages. */
+    private final Map<UUID, NodePageStream<R>> streamsMap;
+
+    /** Array of streams. Used for iteration over result pages. */
+    private final NodePageStream<R>[] streams;
+
+    /** If {@code true} then there wasn't call {@link #hasNext()} on this 
reducer. */
+    private boolean first = true;
+
+    /**
+     * Offset of current stream to get next value. This offset only increments 
if a stream is done. The array {@link
+     * #streams} is sorted to put stream with lowest value on this offset.
+     */
+    private int streamOff;
+
+    /** Compares head of streams to get lowest value at the moment. */
+    private final Comparator<NodePageStream<R>> streamCmp;
+
+    /**
+     * @param fut Cache query future.
+     * @param reqId Cache query request ID.
+     * @param pageRequester Provides a functionality to request pages from 
remote nodes.
+     * @param queueLock Lock object that is shared between 
GridCacheQueryFuture and reducer.
+     * @param nodes Collection of nodes this query applies to.
+     * @param rowCmp Comparator to sort query results from different nodes.
+     */
+    public MergeSortDistributedCacheQueryReducer(
+        GridCacheQueryFutureAdapter fut, long reqId, CacheQueryPageRequester 
pageRequester,
+        Object queueLock, Collection<ClusterNode> nodes, Comparator<R> rowCmp
+    ) {
+        super(fut, reqId, pageRequester);
+
+        streamsMap = new ConcurrentHashMap<>(nodes.size());
+        streams = (NodePageStream<R>[])Array.newInstance(NodePageStream.class, 
nodes.size());
+
+        int i = 0;
+
+        for (ClusterNode node : nodes) {
+            streams[i] = new NodePageStream<>(fut.query().query(), queueLock, 
fut.endTime(), node.id(), (ns, all) ->
+                pageRequester.requestPages(reqId, fut, ns, all)
+            );
+
+            streamsMap.put(node.id(), streams[i++]);
+        }
+
+        streamCmp = (o1, o2) -> {
+            if (o1 == o2)
+                return 0;
+
+            if (o1 == null)
+                return -1;
+
+            if (o2 == null)
+                return 1;
+
+            return rowCmp.compare(o1.head(), o2.head());
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasNext() throws IgniteCheckedException {
+        for (int i = streamOff; i < streams.length; i++) {
+            if (streams[i].hasNext())
+                return true;
+
+            // Nullify obsolete stream, move left bound.
+            streamsMap.remove(streams[i].nodeId());
+            streams[i] = null;
+            streamOff++;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public R next() throws IgniteCheckedException {
+        for (int i = streamOff; i < streams.length; i++) {
+            NodePageStream s = streams[i];
+
+            if (!s.hasNext()) {
+                // Nullify obsolete stream, move left bound.
+                streamsMap.remove(s.nodeId());
+                streams[i] = null;
+                streamOff++;
+            }
+        }
+
+        if (streamOff == streams.length)
+            throw new NoSuchElementException("No next element. Please, be sure 
to invoke hasNext() before next().");
+
+        if (first) {
+            first = false;
+
+            Arrays.sort(streams, streamCmp);
+        } else
+            bubbleUp(streams, streamOff, streamCmp);
+
+        return streams[streamOff].next();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onPage(UUID nodeId, Collection<R> data, boolean 
last) {
+        NodePageStream<R> stream = streamsMap.get(nodeId);
+
+        if (stream == null)
+            return streamsMap.isEmpty();
+
+        stream.addPage(nodeId, data, last);
+
+        return last && (streamsMap.remove(nodeId) != null) && 
streamsMap.isEmpty();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onError() {
+        for (NodePageStream<R> s: streamsMap.values())
+            s.onError();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onCancel() {
+        for (NodePageStream<R> s: streamsMap.values())
+            s.cancel(ns -> pageRequester.cancelQueryRequest(reqId, ns, 
fut.fields()));
+
+        streamsMap.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean mapNode(UUID nodeId) {
+        return streamsMap.containsKey(nodeId);
+    }
+
+    /**
+     * @param arr Array.
+     * @param off Offset.
+     * @param cmp Comparator.
+     */
+    private static <Z> void bubbleUp(Z[] arr, int off, Comparator<Z> cmp) {

Review comment:
       Why don't use logN algorithm here?

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/PageStream.java
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.reducer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Iterator over pages stream. Pages are stored in a queue. After polling a 
queue try load a new page instead of it.
+ */
+class PageStream<R> {
+    /** Queue of data of results pages. */
+    protected final Queue<Collection<R>> queue = new LinkedList<>();
+
+    /** Iterator over current page. */
+    private Iterator<R> iter;
+
+    /**
+     * Dynamic collection of nodes that run this query. If a node finishes 
this query then remove it from this colleciton.
+     */
+    private final Collection<UUID> subgrid;
+
+    /**
+     * List of nodes that respons with cache query result pages. This 
collection should be cleaned before sending new
+     * cache query request.
+     */
+    private final Collection<UUID> rcvd = new HashSet<>();

Review comment:
       Why don't use NodeStream only and merge results at higher level?
   Maybe NodeStreams can even share same queue for the results.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
##########
@@ -101,12 +104,30 @@
     /** Event listener. */
     private GridLocalEventListener lsnr;
 
+    /** Requester of cache query result pages. */
+    private CacheQueryPageRequester pageRequester;
+
     /** {@inheritDoc} */
     @Override public void start0() throws IgniteCheckedException {
         super.start0();
 
         assert cctx.config().getCacheMode() != LOCAL;
 
+        pageRequester = new CacheQueryPageRequester(cctx) {
+            /** {@inheritDoc} */
+            @Override protected void sendLocal(GridCacheQueryRequest req) {

Review comment:
       This looks ambiguous.
   Why don't have just a factory for the requests instead of Requester?

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
##########
@@ -777,133 +724,32 @@ private Object convert(Object obj) {
     }
 
     /** {@inheritDoc} */
-    @Override public CacheQueryFuture<?> 
queryFieldsDistributed(GridCacheQueryBean qry,
-        Collection<ClusterNode> nodes) {
-        assert cctx.config().getCacheMode() != LOCAL;
-
-        if (log.isDebugEnabled())
-            log.debug("Executing distributed query: " + qry);
-
-        long reqId = cctx.io().nextIoId();
-
-        final GridCacheDistributedFieldsQueryFuture fut =
-            new GridCacheDistributedFieldsQueryFuture(cctx, reqId, qry, nodes);
-
-        try {
-            qry.query().validate();
-
-            GridCacheQueryRequest req = new GridCacheQueryRequest(
-                cctx.cacheId(),
-                reqId,
-                cctx.name(),
-                qry.query().type(),
-                true,
-                qry.query().clause(),
-                qry.query().limit(),
-                null,
-                null,
-                null,
-                qry.reducer(),
-                qry.transform(),
-                qry.query().pageSize(),
-                qry.query().includeBackups(),
-                qry.arguments(),
-                qry.query().includeMetadata(),
-                qry.query().keepBinary(),
-                qry.query().subjectId(),
-                qry.query().taskHash(),
-                queryTopologyVersion(),
-                null,
-                cctx.deploymentEnabled(),
-                qry.query().isDataPageScanEnabled());
-
-            addQueryFuture(req.id(), fut);
-
-            final Object topic = topic(cctx.nodeId(), req.id());
-
-            cctx.io().addOrderedCacheHandler(cctx.shared(), topic, resHnd);
-
-            fut.listen(new CI1<IgniteInternalFuture<?>>() {
-                @Override public void apply(IgniteInternalFuture<?> fut) {
-                    cctx.io().removeOrderedHandler(false, topic);
-                }
-            });
-
-            sendRequest(fut, req, nodes);
-        }
-        catch (IgniteCheckedException e) {
-            fut.onDone(e);
-        }
-
-        return fut;
+    @Override public CacheQueryFuture<?> 
queryFieldsDistributed(GridCacheQueryBean qry, Collection<ClusterNode> nodes) {
+        return queryDistributed(qry, nodes, true);
     }
 
-    /**
-     * Sends query request.
-     *
-     * @param fut Distributed future.
-     * @param req Request.
-     * @param nodes Nodes.
-     * @throws IgniteCheckedException In case of error.
-     */
-    private void sendRequest(
-        final GridCacheDistributedQueryFuture<?, ?, ?> fut,
-        final GridCacheQueryRequest req,
-        Collection<ClusterNode> nodes
-    ) throws IgniteCheckedException {
-        assert fut != null;
-        assert req != null;
-        assert nodes != null;
-
-        final UUID locNodeId = cctx.localNodeId();
-
-        ClusterNode locNode = null;
-
-        Collection<ClusterNode> rmtNodes = null;
-
-        for (ClusterNode n : nodes) {
-            if (n.id().equals(locNodeId))
-                locNode = n;
-            else {
-                if (rmtNodes == null)
-                    rmtNodes = new ArrayList<>(nodes.size());
+    /** Creates a reducer depends on query type. */
+    private DistributedCacheQueryReducer createReducer(GridCacheQueryType 
qryType, long reqId,
+        GridCacheDistributedQueryFuture fut, Collection<ClusterNode> nodes) {
 
-                rmtNodes.add(n);
-            }
-        }
+        if (qryType == TEXT) {
+            Comparator<CacheEntryWithPayload<K, V, Float>> cmp = (c1, c2) -> {
+                if (c1.payload() == c2.payload())
+                    return 0;
 
-        // Request should be sent to remote nodes before the query is 
processed on the local node.
-        // For example, a remote reducer has a state, we should not serialize 
and then send
-        // the reducer changed by the local node.
-        if (!F.isEmpty(rmtNodes)) {
-            for (ClusterNode node : rmtNodes) {
-                try {
-                    cctx.io().send(node, req, GridIoPolicy.QUERY_POOL);
-                }
-                catch (IgniteCheckedException e) {
-                    if (cctx.io().checkNodeLeft(node.id(), e, true)) {
-                        fut.onNodeLeft(node.id());
+                if (c1.payload() == null)

Review comment:
       AFAIU, only Text queries have a 'payload'. Why is it named 'payload' and 
not score?
   Does 'null' score make sense for the result? 

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
##########
@@ -777,133 +724,32 @@ private Object convert(Object obj) {
     }
 
     /** {@inheritDoc} */
-    @Override public CacheQueryFuture<?> 
queryFieldsDistributed(GridCacheQueryBean qry,
-        Collection<ClusterNode> nodes) {
-        assert cctx.config().getCacheMode() != LOCAL;
-
-        if (log.isDebugEnabled())
-            log.debug("Executing distributed query: " + qry);
-
-        long reqId = cctx.io().nextIoId();
-
-        final GridCacheDistributedFieldsQueryFuture fut =
-            new GridCacheDistributedFieldsQueryFuture(cctx, reqId, qry, nodes);
-
-        try {
-            qry.query().validate();
-
-            GridCacheQueryRequest req = new GridCacheQueryRequest(
-                cctx.cacheId(),
-                reqId,
-                cctx.name(),
-                qry.query().type(),
-                true,
-                qry.query().clause(),
-                qry.query().limit(),
-                null,
-                null,
-                null,
-                qry.reducer(),
-                qry.transform(),
-                qry.query().pageSize(),
-                qry.query().includeBackups(),
-                qry.arguments(),
-                qry.query().includeMetadata(),
-                qry.query().keepBinary(),
-                qry.query().subjectId(),
-                qry.query().taskHash(),
-                queryTopologyVersion(),
-                null,
-                cctx.deploymentEnabled(),
-                qry.query().isDataPageScanEnabled());
-
-            addQueryFuture(req.id(), fut);
-
-            final Object topic = topic(cctx.nodeId(), req.id());
-
-            cctx.io().addOrderedCacheHandler(cctx.shared(), topic, resHnd);
-
-            fut.listen(new CI1<IgniteInternalFuture<?>>() {
-                @Override public void apply(IgniteInternalFuture<?> fut) {
-                    cctx.io().removeOrderedHandler(false, topic);
-                }
-            });
-
-            sendRequest(fut, req, nodes);
-        }
-        catch (IgniteCheckedException e) {
-            fut.onDone(e);
-        }
-
-        return fut;
+    @Override public CacheQueryFuture<?> 
queryFieldsDistributed(GridCacheQueryBean qry, Collection<ClusterNode> nodes) {
+        return queryDistributed(qry, nodes, true);
     }
 
-    /**
-     * Sends query request.
-     *
-     * @param fut Distributed future.
-     * @param req Request.
-     * @param nodes Nodes.
-     * @throws IgniteCheckedException In case of error.
-     */
-    private void sendRequest(
-        final GridCacheDistributedQueryFuture<?, ?, ?> fut,
-        final GridCacheQueryRequest req,
-        Collection<ClusterNode> nodes
-    ) throws IgniteCheckedException {
-        assert fut != null;
-        assert req != null;
-        assert nodes != null;
-
-        final UUID locNodeId = cctx.localNodeId();
-
-        ClusterNode locNode = null;
-
-        Collection<ClusterNode> rmtNodes = null;
-
-        for (ClusterNode n : nodes) {
-            if (n.id().equals(locNodeId))
-                locNode = n;
-            else {
-                if (rmtNodes == null)
-                    rmtNodes = new ArrayList<>(nodes.size());
+    /** Creates a reducer depends on query type. */
+    private DistributedCacheQueryReducer createReducer(GridCacheQueryType 
qryType, long reqId,
+        GridCacheDistributedQueryFuture fut, Collection<ClusterNode> nodes) {
 
-                rmtNodes.add(n);
-            }
-        }
+        if (qryType == TEXT) {
+            Comparator<CacheEntryWithPayload<K, V, Float>> cmp = (c1, c2) -> {
+                if (c1.payload() == c2.payload())
+                    return 0;
 
-        // Request should be sent to remote nodes before the query is 
processed on the local node.
-        // For example, a remote reducer has a state, we should not serialize 
and then send
-        // the reducer changed by the local node.
-        if (!F.isEmpty(rmtNodes)) {
-            for (ClusterNode node : rmtNodes) {
-                try {
-                    cctx.io().send(node, req, GridIoPolicy.QUERY_POOL);
-                }
-                catch (IgniteCheckedException e) {
-                    if (cctx.io().checkNodeLeft(node.id(), e, true)) {
-                        fut.onNodeLeft(node.id());
+                if (c1.payload() == null)

Review comment:
       AFAIU, only Text queries have a 'payload'. Why is it named 'payload' and 
not score?
   Does 'null' score make sense for the result?  If no then no NPE possible.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
##########
@@ -777,133 +724,32 @@ private Object convert(Object obj) {
     }
 
     /** {@inheritDoc} */
-    @Override public CacheQueryFuture<?> 
queryFieldsDistributed(GridCacheQueryBean qry,
-        Collection<ClusterNode> nodes) {
-        assert cctx.config().getCacheMode() != LOCAL;
-
-        if (log.isDebugEnabled())
-            log.debug("Executing distributed query: " + qry);
-
-        long reqId = cctx.io().nextIoId();
-
-        final GridCacheDistributedFieldsQueryFuture fut =
-            new GridCacheDistributedFieldsQueryFuture(cctx, reqId, qry, nodes);
-
-        try {
-            qry.query().validate();
-
-            GridCacheQueryRequest req = new GridCacheQueryRequest(
-                cctx.cacheId(),
-                reqId,
-                cctx.name(),
-                qry.query().type(),
-                true,
-                qry.query().clause(),
-                qry.query().limit(),
-                null,
-                null,
-                null,
-                qry.reducer(),
-                qry.transform(),
-                qry.query().pageSize(),
-                qry.query().includeBackups(),
-                qry.arguments(),
-                qry.query().includeMetadata(),
-                qry.query().keepBinary(),
-                qry.query().subjectId(),
-                qry.query().taskHash(),
-                queryTopologyVersion(),
-                null,
-                cctx.deploymentEnabled(),
-                qry.query().isDataPageScanEnabled());
-
-            addQueryFuture(req.id(), fut);
-
-            final Object topic = topic(cctx.nodeId(), req.id());
-
-            cctx.io().addOrderedCacheHandler(cctx.shared(), topic, resHnd);
-
-            fut.listen(new CI1<IgniteInternalFuture<?>>() {
-                @Override public void apply(IgniteInternalFuture<?> fut) {
-                    cctx.io().removeOrderedHandler(false, topic);
-                }
-            });
-
-            sendRequest(fut, req, nodes);
-        }
-        catch (IgniteCheckedException e) {
-            fut.onDone(e);
-        }
-
-        return fut;
+    @Override public CacheQueryFuture<?> 
queryFieldsDistributed(GridCacheQueryBean qry, Collection<ClusterNode> nodes) {
+        return queryDistributed(qry, nodes, true);
     }
 
-    /**
-     * Sends query request.
-     *
-     * @param fut Distributed future.
-     * @param req Request.
-     * @param nodes Nodes.
-     * @throws IgniteCheckedException In case of error.
-     */
-    private void sendRequest(
-        final GridCacheDistributedQueryFuture<?, ?, ?> fut,
-        final GridCacheQueryRequest req,
-        Collection<ClusterNode> nodes
-    ) throws IgniteCheckedException {
-        assert fut != null;
-        assert req != null;
-        assert nodes != null;
-
-        final UUID locNodeId = cctx.localNodeId();
-
-        ClusterNode locNode = null;
-
-        Collection<ClusterNode> rmtNodes = null;
-
-        for (ClusterNode n : nodes) {
-            if (n.id().equals(locNodeId))
-                locNode = n;
-            else {
-                if (rmtNodes == null)
-                    rmtNodes = new ArrayList<>(nodes.size());
+    /** Creates a reducer depends on query type. */
+    private DistributedCacheQueryReducer createReducer(GridCacheQueryType 
qryType, long reqId,
+        GridCacheDistributedQueryFuture fut, Collection<ClusterNode> nodes) {
 
-                rmtNodes.add(n);
-            }
-        }
+        if (qryType == TEXT) {
+            Comparator<CacheEntryWithPayload<K, V, Float>> cmp = (c1, c2) -> {
+                if (c1.payload() == c2.payload())
+                    return 0;
 
-        // Request should be sent to remote nodes before the query is 
processed on the local node.
-        // For example, a remote reducer has a state, we should not serialize 
and then send
-        // the reducer changed by the local node.
-        if (!F.isEmpty(rmtNodes)) {
-            for (ClusterNode node : rmtNodes) {
-                try {
-                    cctx.io().send(node, req, GridIoPolicy.QUERY_POOL);
-                }
-                catch (IgniteCheckedException e) {
-                    if (cctx.io().checkNodeLeft(node.id(), e, true)) {
-                        fut.onNodeLeft(node.id());
+                if (c1.payload() == null)

Review comment:
       Comparator instance can be static.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/MergeSortDistributedCacheQueryReducer.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.reducer;
+
+import java.lang.reflect.Array;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import 
org.apache.ignite.internal.processors.cache.query.CacheQueryPageRequester;
+import 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Reducer of distirbuted query that sort result through all nodes. Note that 
it's assumed that every node
+ * returns pre-sorted collection of data.
+ */
+public class MergeSortDistributedCacheQueryReducer<R> extends 
AbstractDistributedCacheQueryReducer<R> {
+    /** Map of Node ID to stream. Used for inserting new pages. */
+    private final Map<UUID, NodePageStream<R>> streamsMap;
+
+    /** Array of streams. Used for iteration over result pages. */
+    private final NodePageStream<R>[] streams;
+
+    /** If {@code true} then there wasn't call {@link #hasNext()} on this 
reducer. */
+    private boolean first = true;
+
+    /**
+     * Offset of current stream to get next value. This offset only increments 
if a stream is done. The array {@link
+     * #streams} is sorted to put stream with lowest value on this offset.
+     */
+    private int streamOff;
+
+    /** Compares head of streams to get lowest value at the moment. */
+    private final Comparator<NodePageStream<R>> streamCmp;
+
+    /**
+     * @param fut Cache query future.
+     * @param reqId Cache query request ID.
+     * @param pageRequester Provides a functionality to request pages from 
remote nodes.
+     * @param queueLock Lock object that is shared between 
GridCacheQueryFuture and reducer.
+     * @param nodes Collection of nodes this query applies to.
+     * @param rowCmp Comparator to sort query results from different nodes.
+     */
+    public MergeSortDistributedCacheQueryReducer(
+        GridCacheQueryFutureAdapter fut, long reqId, CacheQueryPageRequester 
pageRequester,
+        Object queueLock, Collection<ClusterNode> nodes, Comparator<R> rowCmp
+    ) {
+        super(fut, reqId, pageRequester);
+
+        streamsMap = new ConcurrentHashMap<>(nodes.size());
+        streams = (NodePageStream<R>[])Array.newInstance(NodePageStream.class, 
nodes.size());
+
+        int i = 0;
+
+        for (ClusterNode node : nodes) {
+            streams[i] = new NodePageStream<>(fut.query().query(), queueLock, 
fut.endTime(), node.id(), (ns, all) ->
+                pageRequester.requestPages(reqId, fut, ns, all)
+            );
+
+            streamsMap.put(node.id(), streams[i++]);
+        }
+
+        streamCmp = (o1, o2) -> {
+            if (o1 == o2)
+                return 0;
+
+            if (o1 == null)
+                return -1;
+
+            if (o2 == null)
+                return 1;
+
+            return rowCmp.compare(o1.head(), o2.head());
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasNext() throws IgniteCheckedException {
+        for (int i = streamOff; i < streams.length; i++) {
+            if (streams[i].hasNext())
+                return true;
+
+            // Nullify obsolete stream, move left bound.
+            streamsMap.remove(streams[i].nodeId());
+            streams[i] = null;
+            streamOff++;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public R next() throws IgniteCheckedException {
+        for (int i = streamOff; i < streams.length; i++) {
+            NodePageStream s = streams[i];
+
+            if (!s.hasNext()) {
+                // Nullify obsolete stream, move left bound.
+                streamsMap.remove(s.nodeId());
+                streams[i] = null;
+                streamOff++;
+            }
+        }
+
+        if (streamOff == streams.length)
+            throw new NoSuchElementException("No next element. Please, be sure 
to invoke hasNext() before next().");
+
+        if (first) {
+            first = false;
+
+            Arrays.sort(streams, streamCmp);
+        } else
+            bubbleUp(streams, streamOff, streamCmp);
+
+        return streams[streamOff].next();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onPage(UUID nodeId, Collection<R> data, boolean 
last) {
+        NodePageStream<R> stream = streamsMap.get(nodeId);
+
+        if (stream == null)
+            return streamsMap.isEmpty();
+
+        stream.addPage(nodeId, data, last);
+
+        return last && (streamsMap.remove(nodeId) != null) && 
streamsMap.isEmpty();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onError() {
+        for (NodePageStream<R> s: streamsMap.values())
+            s.onError();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onCancel() {
+        for (NodePageStream<R> s: streamsMap.values())
+            s.cancel(ns -> pageRequester.cancelQueryRequest(reqId, ns, 
fut.fields()));
+
+        streamsMap.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean mapNode(UUID nodeId) {
+        return streamsMap.containsKey(nodeId);
+    }
+
+    /**
+     * @param arr Array.
+     * @param off Offset.
+     * @param cmp Comparator.
+     */
+    private static <Z> void bubbleUp(Z[] arr, int off, Comparator<Z> cmp) {

Review comment:
       Why don't use logN algorithm here?
   E.g. use MinHeap instead.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/MergeSortDistributedCacheQueryReducer.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.reducer;
+
+import java.lang.reflect.Array;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import 
org.apache.ignite.internal.processors.cache.query.CacheQueryPageRequester;
+import 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Reducer of distirbuted query that sort result through all nodes. Note that 
it's assumed that every node
+ * returns pre-sorted collection of data.
+ */
+public class MergeSortDistributedCacheQueryReducer<R> extends 
AbstractDistributedCacheQueryReducer<R> {
+    /** Map of Node ID to stream. Used for inserting new pages. */
+    private final Map<UUID, NodePageStream<R>> streamsMap;
+
+    /** Array of streams. Used for iteration over result pages. */
+    private final NodePageStream<R>[] streams;
+
+    /** If {@code true} then there wasn't call {@link #hasNext()} on this 
reducer. */
+    private boolean first = true;
+
+    /**
+     * Offset of current stream to get next value. This offset only increments 
if a stream is done. The array {@link
+     * #streams} is sorted to put stream with lowest value on this offset.
+     */
+    private int streamOff;
+
+    /** Compares head of streams to get lowest value at the moment. */
+    private final Comparator<NodePageStream<R>> streamCmp;
+
+    /**
+     * @param fut Cache query future.
+     * @param reqId Cache query request ID.
+     * @param pageRequester Provides a functionality to request pages from 
remote nodes.
+     * @param queueLock Lock object that is shared between 
GridCacheQueryFuture and reducer.
+     * @param nodes Collection of nodes this query applies to.
+     * @param rowCmp Comparator to sort query results from different nodes.
+     */
+    public MergeSortDistributedCacheQueryReducer(
+        GridCacheQueryFutureAdapter fut, long reqId, CacheQueryPageRequester 
pageRequester,
+        Object queueLock, Collection<ClusterNode> nodes, Comparator<R> rowCmp
+    ) {
+        super(fut, reqId, pageRequester);
+
+        streamsMap = new ConcurrentHashMap<>(nodes.size());
+        streams = (NodePageStream<R>[])Array.newInstance(NodePageStream.class, 
nodes.size());
+
+        int i = 0;
+
+        for (ClusterNode node : nodes) {
+            streams[i] = new NodePageStream<>(fut.query().query(), queueLock, 
fut.endTime(), node.id(), (ns, all) ->
+                pageRequester.requestPages(reqId, fut, ns, all)
+            );
+
+            streamsMap.put(node.id(), streams[i++]);
+        }
+
+        streamCmp = (o1, o2) -> {
+            if (o1 == o2)
+                return 0;
+
+            if (o1 == null)
+                return -1;
+
+            if (o2 == null)
+                return 1;
+
+            return rowCmp.compare(o1.head(), o2.head());
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasNext() throws IgniteCheckedException {
+        for (int i = streamOff; i < streams.length; i++) {
+            if (streams[i].hasNext())
+                return true;
+
+            // Nullify obsolete stream, move left bound.
+            streamsMap.remove(streams[i].nodeId());
+            streams[i] = null;
+            streamOff++;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public R next() throws IgniteCheckedException {
+        for (int i = streamOff; i < streams.length; i++) {
+            NodePageStream s = streams[i];
+
+            if (!s.hasNext()) {
+                // Nullify obsolete stream, move left bound.
+                streamsMap.remove(s.nodeId());
+                streams[i] = null;
+                streamOff++;
+            }
+        }
+
+        if (streamOff == streams.length)
+            throw new NoSuchElementException("No next element. Please, be sure 
to invoke hasNext() before next().");
+
+        if (first) {
+            first = false;
+
+            Arrays.sort(streams, streamCmp);
+        } else
+            bubbleUp(streams, streamOff, streamCmp);
+
+        return streams[streamOff].next();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onPage(UUID nodeId, Collection<R> data, boolean 
last) {
+        NodePageStream<R> stream = streamsMap.get(nodeId);
+
+        if (stream == null)
+            return streamsMap.isEmpty();
+
+        stream.addPage(nodeId, data, last);
+
+        return last && (streamsMap.remove(nodeId) != null) && 
streamsMap.isEmpty();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onError() {
+        for (NodePageStream<R> s: streamsMap.values())
+            s.onError();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onCancel() {
+        for (NodePageStream<R> s: streamsMap.values())
+            s.cancel(ns -> pageRequester.cancelQueryRequest(reqId, ns, 
fut.fields()));
+
+        streamsMap.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean mapNode(UUID nodeId) {
+        return streamsMap.containsKey(nodeId);
+    }
+
+    /**
+     * @param arr Array.
+     * @param off Offset.
+     * @param cmp Comparator.
+     */
+    private static <Z> void bubbleUp(Z[] arr, int off, Comparator<Z> cmp) {

Review comment:
       Why don't use logN algorithm here?
   E.g. use MinHeap structure instead.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
##########
@@ -777,133 +724,32 @@ private Object convert(Object obj) {
     }
 
     /** {@inheritDoc} */
-    @Override public CacheQueryFuture<?> 
queryFieldsDistributed(GridCacheQueryBean qry,
-        Collection<ClusterNode> nodes) {
-        assert cctx.config().getCacheMode() != LOCAL;
-
-        if (log.isDebugEnabled())
-            log.debug("Executing distributed query: " + qry);
-
-        long reqId = cctx.io().nextIoId();
-
-        final GridCacheDistributedFieldsQueryFuture fut =
-            new GridCacheDistributedFieldsQueryFuture(cctx, reqId, qry, nodes);
-
-        try {
-            qry.query().validate();
-
-            GridCacheQueryRequest req = new GridCacheQueryRequest(
-                cctx.cacheId(),
-                reqId,
-                cctx.name(),
-                qry.query().type(),
-                true,
-                qry.query().clause(),
-                qry.query().limit(),
-                null,
-                null,
-                null,
-                qry.reducer(),
-                qry.transform(),
-                qry.query().pageSize(),
-                qry.query().includeBackups(),
-                qry.arguments(),
-                qry.query().includeMetadata(),
-                qry.query().keepBinary(),
-                qry.query().subjectId(),
-                qry.query().taskHash(),
-                queryTopologyVersion(),
-                null,
-                cctx.deploymentEnabled(),
-                qry.query().isDataPageScanEnabled());
-
-            addQueryFuture(req.id(), fut);
-
-            final Object topic = topic(cctx.nodeId(), req.id());
-
-            cctx.io().addOrderedCacheHandler(cctx.shared(), topic, resHnd);
-
-            fut.listen(new CI1<IgniteInternalFuture<?>>() {
-                @Override public void apply(IgniteInternalFuture<?> fut) {
-                    cctx.io().removeOrderedHandler(false, topic);
-                }
-            });
-
-            sendRequest(fut, req, nodes);
-        }
-        catch (IgniteCheckedException e) {
-            fut.onDone(e);
-        }
-
-        return fut;
+    @Override public CacheQueryFuture<?> 
queryFieldsDistributed(GridCacheQueryBean qry, Collection<ClusterNode> nodes) {
+        return queryDistributed(qry, nodes, true);
     }
 
-    /**
-     * Sends query request.
-     *
-     * @param fut Distributed future.
-     * @param req Request.
-     * @param nodes Nodes.
-     * @throws IgniteCheckedException In case of error.
-     */
-    private void sendRequest(
-        final GridCacheDistributedQueryFuture<?, ?, ?> fut,
-        final GridCacheQueryRequest req,
-        Collection<ClusterNode> nodes
-    ) throws IgniteCheckedException {
-        assert fut != null;
-        assert req != null;
-        assert nodes != null;
-
-        final UUID locNodeId = cctx.localNodeId();
-
-        ClusterNode locNode = null;
-
-        Collection<ClusterNode> rmtNodes = null;
-
-        for (ClusterNode n : nodes) {
-            if (n.id().equals(locNodeId))
-                locNode = n;
-            else {
-                if (rmtNodes == null)
-                    rmtNodes = new ArrayList<>(nodes.size());
+    /** Creates a reducer depends on query type. */
+    private DistributedCacheQueryReducer createReducer(GridCacheQueryType 
qryType, long reqId,
+        GridCacheDistributedQueryFuture fut, Collection<ClusterNode> nodes) {
 
-                rmtNodes.add(n);
-            }
-        }
+        if (qryType == TEXT) {
+            Comparator<CacheEntryWithPayload<K, V, Float>> cmp = (c1, c2) -> {
+                if (c1.payload() == c2.payload())
+                    return 0;
 
-        // Request should be sent to remote nodes before the query is 
processed on the local node.
-        // For example, a remote reducer has a state, we should not serialize 
and then send
-        // the reducer changed by the local node.
-        if (!F.isEmpty(rmtNodes)) {
-            for (ClusterNode node : rmtNodes) {
-                try {
-                    cctx.io().send(node, req, GridIoPolicy.QUERY_POOL);
-                }
-                catch (IgniteCheckedException e) {
-                    if (cctx.io().checkNodeLeft(node.id(), e, true)) {
-                        fut.onNodeLeft(node.id());
+                if (c1.payload() == null)

Review comment:
       Comparator instance can be static or moved into Reducer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to